Improve exception robustness.

- Introduced `test_swap_edge_cases.cc` with extensive tests for minimum payload sizes, truncated payloads, data overflows, unsupported encoding versions, CRC mismatches, and mixed valid/invalid records to ensure reliability under complex scenarios.
- Enhanced `main.cc` with a top-level exception handler to prevent data loss and ensure cleanup during unexpected failures.
This commit is contained in:
2026-02-17 20:12:09 -08:00
parent a21409e689
commit a428b204a0
6 changed files with 1203 additions and 199 deletions

309
Swap.cc
View File

@@ -598,24 +598,32 @@ SwapManager::write_header(int fd)
bool
SwapManager::open_ctx(JournalCtx &ctx, const std::string &path)
SwapManager::open_ctx(JournalCtx &ctx, const std::string &path, std::string &err)
{
err.clear();
if (ctx.fd >= 0)
return true;
if (!ensure_parent_dir(path))
if (!ensure_parent_dir(path)) {
err = "Failed to create parent directory for swap file: " + path;
return false;
}
int flags = O_CREAT | O_WRONLY | O_APPEND;
#ifdef O_CLOEXEC
flags |= O_CLOEXEC;
#endif
int fd = ::open(path.c_str(), flags, 0600);
if (fd < 0)
if (fd < 0) {
int saved_errno = errno;
err = "Failed to open swap file '" + path + "': " + std::strerror(saved_errno);
return false;
}
// Ensure permissions even if file already existed.
(void) ::fchmod(fd, 0600);
struct stat st{};
if (fstat(fd, &st) != 0) {
int saved_errno = errno;
::close(fd);
err = "Failed to fstat swap file '" + path + "': " + std::strerror(saved_errno);
return false;
}
// If an existing file is too small to contain the fixed header, truncate
@@ -627,8 +635,11 @@ SwapManager::open_ctx(JournalCtx &ctx, const std::string &path)
tflags |= O_CLOEXEC;
#endif
fd = ::open(path.c_str(), tflags, 0600);
if (fd < 0)
if (fd < 0) {
int saved_errno = errno;
err = "Failed to reopen swap file for truncation '" + path + "': " + std::strerror(saved_errno);
return false;
}
(void) ::fchmod(fd, 0600);
st.st_size = 0;
}
@@ -637,6 +648,9 @@ SwapManager::open_ctx(JournalCtx &ctx, const std::string &path)
if (st.st_size == 0) {
ctx.header_ok = write_header(fd);
ctx.approx_size_bytes = ctx.header_ok ? 64 : 0;
if (!ctx.header_ok) {
err = "Failed to write swap file header: " + path;
}
} else {
ctx.header_ok = true; // stage 1: trust existing header
ctx.approx_size_bytes = static_cast<std::uint64_t>(st.st_size);
@@ -658,12 +672,17 @@ SwapManager::close_ctx(JournalCtx &ctx)
bool
SwapManager::compact_to_checkpoint(JournalCtx &ctx, const std::vector<std::uint8_t> &chkpt_record)
SwapManager::compact_to_checkpoint(JournalCtx &ctx, const std::vector<std::uint8_t> &chkpt_record, std::string &err)
{
if (ctx.path.empty())
err.clear();
if (ctx.path.empty()) {
err = "Compact failed: empty path";
return false;
if (chkpt_record.empty())
}
if (chkpt_record.empty()) {
err = "Compact failed: empty checkpoint record";
return false;
}
// Close existing file before rename.
if (ctx.fd >= 0) {
@@ -675,30 +694,46 @@ SwapManager::compact_to_checkpoint(JournalCtx &ctx, const std::vector<std::uint8
const std::string tmp_path = ctx.path + ".tmp";
// Create the compacted file: header + checkpoint record.
if (!ensure_parent_dir(tmp_path))
if (!ensure_parent_dir(tmp_path)) {
err = "Failed to create parent directory for temp swap file: " + tmp_path;
return false;
}
int flags = O_CREAT | O_WRONLY | O_TRUNC;
#ifdef O_CLOEXEC
flags |= O_CLOEXEC;
#endif
int tfd = ::open(tmp_path.c_str(), flags, 0600);
if (tfd < 0)
if (tfd < 0) {
int saved_errno = errno;
err = "Failed to open temp swap file '" + tmp_path + "': " + std::strerror(saved_errno);
return false;
}
(void) ::fchmod(tfd, 0600);
bool ok = write_header(tfd);
if (ok)
ok = write_full(tfd, chkpt_record.data(), chkpt_record.size());
if (ok)
ok = (::fsync(tfd) == 0);
if (ok) {
if (::fsync(tfd) != 0) {
int saved_errno = errno;
err = "Failed to fsync temp swap file '" + tmp_path + "': " + std::strerror(saved_errno);
ok = false;
}
}
::close(tfd);
if (!ok) {
if (err.empty()) {
err = "Failed to write temp swap file: " + tmp_path;
}
std::remove(tmp_path.c_str());
return false;
}
// Atomic replace.
if (::rename(tmp_path.c_str(), ctx.path.c_str()) != 0) {
int saved_errno = errno;
err = "Failed to rename temp swap file '" + tmp_path + "' to '" + ctx.path + "': " + std::strerror(
saved_errno);
std::remove(tmp_path.c_str());
return false;
}
@@ -723,8 +758,10 @@ SwapManager::compact_to_checkpoint(JournalCtx &ctx, const std::vector<std::uint8
}
// Re-open for further appends.
if (!open_ctx(ctx, ctx.path))
if (!open_ctx(ctx, ctx.path, err)) {
// err already set by open_ctx
return false;
}
ctx.approx_size_bytes = 64 + static_cast<std::uint64_t>(chkpt_record.size());
return true;
}
@@ -969,7 +1006,13 @@ SwapManager::writer_loop()
continue;
for (const Pending &p: batch) {
process_one(p);
try {
process_one(p);
} catch (const std::exception &e) {
report_error(std::string("Exception in process_one: ") + e.what(), p.buf);
} catch (...) {
report_error("Unknown exception in process_one", p.buf);
}
{
std::lock_guard<std::mutex> lg(mtx_);
if (p.seq > last_processed_)
@@ -981,23 +1024,29 @@ SwapManager::writer_loop()
}
// Throttled fsync: best-effort (grouped)
std::vector<int> to_sync;
std::uint64_t now = now_ns();
{
std::lock_guard<std::mutex> lg(mtx_);
for (auto &kv: journals_) {
JournalCtx &ctx = kv.second;
if (ctx.fd >= 0) {
if (ctx.last_fsync_ns == 0 || (now - ctx.last_fsync_ns) / 1000000ULL >=
cfg_.fsync_interval_ms) {
ctx.last_fsync_ns = now;
to_sync.push_back(ctx.fd);
try {
std::vector<int> to_sync;
std::uint64_t now = now_ns();
{
std::lock_guard<std::mutex> lg(mtx_);
for (auto &kv: journals_) {
JournalCtx &ctx = kv.second;
if (ctx.fd >= 0) {
if (ctx.last_fsync_ns == 0 || (now - ctx.last_fsync_ns) / 1000000ULL >=
cfg_.fsync_interval_ms) {
ctx.last_fsync_ns = now;
to_sync.push_back(ctx.fd);
}
}
}
}
}
for (int fd: to_sync) {
(void) ::fsync(fd);
for (int fd: to_sync) {
(void) ::fsync(fd);
}
} catch (const std::exception &e) {
report_error(std::string("Exception in fsync operations: ") + e.what());
} catch (...) {
report_error("Unknown exception in fsync operations");
}
}
// Wake any waiters.
@@ -1010,70 +1059,90 @@ SwapManager::process_one(const Pending &p)
{
if (!p.buf)
return;
Buffer &buf = *p.buf;
JournalCtx *ctxp = nullptr;
std::string path;
std::size_t compact_bytes = 0;
{
std::lock_guard<std::mutex> lg(mtx_);
auto it = journals_.find(p.buf);
if (it == journals_.end())
try {
Buffer &buf = *p.buf;
JournalCtx *ctxp = nullptr;
std::string path;
std::size_t compact_bytes = 0;
{
std::lock_guard<std::mutex> lg(mtx_);
auto it = journals_.find(p.buf);
if (it == journals_.end())
return;
if (it->second.path.empty())
it->second.path = ComputeSidecarPath(buf);
path = it->second.path;
ctxp = &it->second;
compact_bytes = cfg_.compact_bytes;
}
if (!ctxp)
return;
if (it->second.path.empty())
it->second.path = ComputeSidecarPath(buf);
path = it->second.path;
ctxp = &it->second;
compact_bytes = cfg_.compact_bytes;
}
if (!ctxp)
return;
if (!open_ctx(*ctxp, path))
return;
if (p.payload.size() > 0xFFFFFFu)
return;
std::string open_err;
if (!open_ctx(*ctxp, path, open_err)) {
report_error(open_err, p.buf);
return;
}
if (p.payload.size() > 0xFFFFFFu) {
report_error("Payload too large: " + std::to_string(p.payload.size()) + " bytes", p.buf);
return;
}
// Build record: [type u8][len u24][payload][crc32 u32]
std::uint8_t len3[3];
put_u24_le(len3, static_cast<std::uint32_t>(p.payload.size()));
// Build record: [type u8][len u24][payload][crc32 u32]
std::uint8_t len3[3];
put_u24_le(len3, static_cast<std::uint32_t>(p.payload.size()));
std::uint8_t head[4];
head[0] = static_cast<std::uint8_t>(p.type);
head[1] = len3[0];
head[2] = len3[1];
head[3] = len3[2];
std::uint8_t head[4];
head[0] = static_cast<std::uint8_t>(p.type);
head[1] = len3[0];
head[2] = len3[1];
head[3] = len3[2];
std::uint32_t c = 0;
c = crc32(head, sizeof(head), c);
if (!p.payload.empty())
c = crc32(p.payload.data(), p.payload.size(), c);
std::uint8_t crcbytes[4];
crcbytes[0] = static_cast<std::uint8_t>(c & 0xFFu);
crcbytes[1] = static_cast<std::uint8_t>((c >> 8) & 0xFFu);
crcbytes[2] = static_cast<std::uint8_t>((c >> 16) & 0xFFu);
crcbytes[3] = static_cast<std::uint8_t>((c >> 24) & 0xFFu);
std::uint32_t c = 0;
c = crc32(head, sizeof(head), c);
if (!p.payload.empty())
c = crc32(p.payload.data(), p.payload.size(), c);
std::uint8_t crcbytes[4];
crcbytes[0] = static_cast<std::uint8_t>(c & 0xFFu);
crcbytes[1] = static_cast<std::uint8_t>((c >> 8) & 0xFFu);
crcbytes[2] = static_cast<std::uint8_t>((c >> 16) & 0xFFu);
crcbytes[3] = static_cast<std::uint8_t>((c >> 24) & 0xFFu);
std::vector<std::uint8_t> rec;
rec.reserve(sizeof(head) + p.payload.size() + sizeof(crcbytes));
rec.insert(rec.end(), head, head + sizeof(head));
if (!p.payload.empty())
rec.insert(rec.end(), p.payload.begin(), p.payload.end());
rec.insert(rec.end(), crcbytes, crcbytes + sizeof(crcbytes));
std::vector<std::uint8_t> rec;
rec.reserve(sizeof(head) + p.payload.size() + sizeof(crcbytes));
rec.insert(rec.end(), head, head + sizeof(head));
if (!p.payload.empty())
rec.insert(rec.end(), p.payload.begin(), p.payload.end());
rec.insert(rec.end(), crcbytes, crcbytes + sizeof(crcbytes));
// Write (handle partial writes and check results)
bool ok = write_full(ctxp->fd, rec.data(), rec.size());
if (ok) {
// Write (handle partial writes and check results)
bool ok = write_full(ctxp->fd, rec.data(), rec.size());
if (!ok) {
int err = errno;
report_error("Failed to write swap record to '" + path + "': " + std::strerror(err), p.buf);
return;
}
ctxp->approx_size_bytes += static_cast<std::uint64_t>(rec.size());
if (p.urgent_flush) {
(void) ::fsync(ctxp->fd);
if (::fsync(ctxp->fd) != 0) {
int err = errno;
report_error("Failed to fsync swap file '" + path + "': " + std::strerror(err), p.buf);
}
ctxp->last_fsync_ns = now_ns();
}
if (p.type == SwapRecType::CHKPT && compact_bytes > 0 &&
ctxp->approx_size_bytes >= static_cast<std::uint64_t>(compact_bytes)) {
(void) compact_to_checkpoint(*ctxp, rec);
std::string compact_err;
if (!compact_to_checkpoint(*ctxp, rec, compact_err)) {
report_error(compact_err, p.buf);
}
}
} catch (const std::exception &e) {
report_error(std::string("Exception in process_one: ") + e.what(), p.buf);
} catch (...) {
report_error("Unknown exception in process_one", p.buf);
}
(void) ok; // best-effort; future work could mark ctx error state
}
@@ -1184,8 +1253,10 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
switch (type) {
case SwapRecType::INS: {
std::size_t off = 0;
if (payload.empty()) {
err = "Swap record missing INS payload";
// INS payload: encver(1) + row(4) + col(4) + nbytes(4) + data(nbytes)
// Minimum: 1 + 4 + 4 + 4 = 13 bytes
if (payload.size() < 13) {
err = "INS payload too short (need at least 13 bytes)";
return false;
}
const std::uint8_t encver = payload[off++];
@@ -1196,7 +1267,7 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
std::uint32_t row = 0, col = 0, nbytes = 0;
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col) || !parse_u32_le(
payload, off, nbytes)) {
err = "Malformed INS payload";
err = "Malformed INS payload (failed to parse row/col/nbytes)";
return false;
}
if (off + nbytes > payload.size()) {
@@ -1209,8 +1280,10 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
case SwapRecType::DEL: {
std::size_t off = 0;
if (payload.empty()) {
err = "Swap record missing DEL payload";
// DEL payload: encver(1) + row(4) + col(4) + dlen(4)
// Minimum: 1 + 4 + 4 + 4 = 13 bytes
if (payload.size() < 13) {
err = "DEL payload too short (need at least 13 bytes)";
return false;
}
const std::uint8_t encver = payload[off++];
@@ -1221,7 +1294,7 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
std::uint32_t row = 0, col = 0, dlen = 0;
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col) || !parse_u32_le(
payload, off, dlen)) {
err = "Malformed DEL payload";
err = "Malformed DEL payload (failed to parse row/col/dlen)";
return false;
}
buf.delete_text((int) row, (int) col, (std::size_t) dlen);
@@ -1229,8 +1302,10 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
case SwapRecType::SPLIT: {
std::size_t off = 0;
if (payload.empty()) {
err = "Swap record missing SPLIT payload";
// SPLIT payload: encver(1) + row(4) + col(4)
// Minimum: 1 + 4 + 4 = 9 bytes
if (payload.size() < 9) {
err = "SPLIT payload too short (need at least 9 bytes)";
return false;
}
const std::uint8_t encver = payload[off++];
@@ -1240,7 +1315,7 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
std::uint32_t row = 0, col = 0;
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col)) {
err = "Malformed SPLIT payload";
err = "Malformed SPLIT payload (failed to parse row/col)";
return false;
}
buf.split_line((int) row, (int) col);
@@ -1248,8 +1323,10 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
case SwapRecType::JOIN: {
std::size_t off = 0;
if (payload.empty()) {
err = "Swap record missing JOIN payload";
// JOIN payload: encver(1) + row(4)
// Minimum: 1 + 4 = 5 bytes
if (payload.size() < 5) {
err = "JOIN payload too short (need at least 5 bytes)";
return false;
}
const std::uint8_t encver = payload[off++];
@@ -1259,7 +1336,7 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
std::uint32_t row = 0;
if (!parse_u32_le(payload, off, row)) {
err = "Malformed JOIN payload";
err = "Malformed JOIN payload (failed to parse row)";
return false;
}
buf.join_lines((int) row);
@@ -1267,8 +1344,10 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
case SwapRecType::CHKPT: {
std::size_t off = 0;
// CHKPT payload: encver(1) + nbytes(4) + data(nbytes)
// Minimum: 1 + 4 = 5 bytes
if (payload.size() < 5) {
err = "Malformed CHKPT payload";
err = "CHKPT payload too short (need at least 5 bytes)";
return false;
}
const std::uint8_t encver = payload[off++];
@@ -1278,7 +1357,7 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
std::uint32_t nbytes = 0;
if (!parse_u32_le(payload, off, nbytes)) {
err = "Malformed CHKPT payload";
err = "Malformed CHKPT payload (failed to parse nbytes)";
return false;
}
if (off + nbytes > payload.size()) {
@@ -1295,4 +1374,54 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
}
}
}
void
SwapManager::report_error(const std::string &message, Buffer *buf)
{
std::lock_guard<std::mutex> lg(mtx_);
SwapError err;
err.timestamp_ns = now_ns();
err.message = message;
if (buf && !buf->Filename().empty()) {
err.buffer_name = buf->Filename();
} else if (buf) {
err.buffer_name = "<unnamed>";
} else {
err.buffer_name = "<unknown>";
}
errors_.push_back(err);
// Bound the error queue to 100 entries
while (errors_.size() > 100) {
errors_.pop_front();
}
++total_error_count_;
}
bool
SwapManager::HasErrors() const
{
std::lock_guard<std::mutex> lg(mtx_);
return !errors_.empty();
}
std::string
SwapManager::GetLastError() const
{
std::lock_guard<std::mutex> lg(mtx_);
if (errors_.empty())
return "";
const SwapError &e = errors_.back();
return "[" + e.buffer_name + "] " + e.message;
}
std::size_t
SwapManager::GetErrorCount() const
{
std::lock_guard<std::mutex> lg(mtx_);
return total_error_count_;
}
} // namespace kte