Introduce swap journaling crash recovery system with tests.
- Added detailed journaling system (`SwapManager`) for crash recovery, including edit recording and replay. - Integrated recovery prompts for handling swap files during file open flows. - Implemented swap file cleanup, checkpointing, and compaction mechanisms. - Added extensive unit tests for swap-related behaviors such as recovery prompts, file pruning, and corruption handling. - Updated CMake to include new test files.
This commit is contained in:
610
Swap.cc
610
Swap.cc
@@ -22,6 +22,24 @@ constexpr std::uint8_t MAGIC[8] = {'K', 'T', 'E', '_', 'S', 'W', 'P', '\0'};
|
||||
constexpr std::uint32_t VERSION = 1;
|
||||
|
||||
|
||||
static std::string
|
||||
snapshot_buffer_bytes(const Buffer &b)
|
||||
{
|
||||
const auto &rows = b.Rows();
|
||||
std::string out;
|
||||
// Cheap lower bound: sum of row sizes.
|
||||
std::size_t approx = 0;
|
||||
for (const auto &r: rows)
|
||||
approx += r.size();
|
||||
out.reserve(approx);
|
||||
for (std::size_t i = 0; i < rows.size(); i++) {
|
||||
auto v = b.GetLineView(i);
|
||||
out.append(v.data(), v.size());
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
|
||||
static fs::path
|
||||
xdg_state_home()
|
||||
{
|
||||
@@ -38,6 +56,13 @@ xdg_state_home()
|
||||
}
|
||||
|
||||
|
||||
static fs::path
|
||||
swap_root_dir()
|
||||
{
|
||||
return xdg_state_home() / "kte" / "swap";
|
||||
}
|
||||
|
||||
|
||||
static std::uint64_t
|
||||
fnv1a64(std::string_view s)
|
||||
{
|
||||
@@ -82,6 +107,64 @@ write_full(int fd, const void *buf, size_t len)
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
static std::string
|
||||
encode_path_key(std::string s)
|
||||
{
|
||||
// Turn an absolute path like "/home/kyle/tmp/test.txt" into
|
||||
// "home!kyle!tmp!test.txt" so swap files are human-identifiable.
|
||||
//
|
||||
// Notes:
|
||||
// - We strip a single leading path separator so absolute paths don't start with '!'.
|
||||
// - We replace both '/' and '\\' with '!'.
|
||||
// - We leave other characters as-is (spaces are OK on POSIX).
|
||||
if (!s.empty() && (s[0] == '/' || s[0] == '\\'))
|
||||
s.erase(0, 1);
|
||||
for (char &ch: s) {
|
||||
if (ch == '/' || ch == '\\')
|
||||
ch = '!';
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
|
||||
static std::string
|
||||
compute_swap_path_for_filename(const std::string &filename)
|
||||
{
|
||||
if (filename.empty())
|
||||
return std::string();
|
||||
// Always place swap under an XDG home-appropriate state directory.
|
||||
// This avoids cluttering working directories and prevents stomping on
|
||||
// swap files when multiple different paths share the same basename.
|
||||
fs::path root = swap_root_dir();
|
||||
|
||||
fs::path p(filename);
|
||||
std::string key;
|
||||
try {
|
||||
key = fs::weakly_canonical(p).string();
|
||||
} catch (...) {
|
||||
try {
|
||||
key = fs::absolute(p).string();
|
||||
} catch (...) {
|
||||
key = filename;
|
||||
}
|
||||
}
|
||||
std::string encoded = encode_path_key(key);
|
||||
if (!encoded.empty()) {
|
||||
std::string name = encoded + ".swp";
|
||||
// Avoid filesystem/path length issues; fall back to hashed naming.
|
||||
// NAME_MAX is often 255 on POSIX, but keep extra headroom.
|
||||
if (name.size() <= 200) {
|
||||
return (root / name).string();
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: stable, shorter name based on basename + hash.
|
||||
std::string base = p.filename().string();
|
||||
const std::string name = base + "." + hex_u64(fnv1a64(key)) + ".swp";
|
||||
return (root / name).string();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -91,6 +174,11 @@ SwapManager::SwapManager()
|
||||
worker_ = std::thread([this] {
|
||||
this->writer_loop();
|
||||
});
|
||||
// Best-effort prune of old swap files.
|
||||
// Safe early in startup: journals_ is still empty and no fds are open yet.
|
||||
if (cfg_.prune_on_startup) {
|
||||
PruneSwapDir();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -109,6 +197,29 @@ SwapManager::~SwapManager()
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::Checkpoint(Buffer *buf)
|
||||
{
|
||||
if (buf) {
|
||||
RecordCheckpoint(*buf, false);
|
||||
return;
|
||||
}
|
||||
// All buffers
|
||||
std::vector<Buffer *> bufs;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
bufs.reserve(journals_.size());
|
||||
for (auto &kv: journals_) {
|
||||
bufs.push_back(kv.first);
|
||||
}
|
||||
}
|
||||
for (Buffer *b: bufs) {
|
||||
if (b)
|
||||
RecordCheckpoint(*b, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::Flush(Buffer *buf)
|
||||
{
|
||||
@@ -171,10 +282,14 @@ SwapManager::Attach(Buffer *buf)
|
||||
|
||||
|
||||
void
|
||||
SwapManager::Detach(Buffer *buf)
|
||||
SwapManager::Detach(Buffer *buf, const bool remove_file)
|
||||
{
|
||||
if (!buf)
|
||||
return;
|
||||
// Write a best-effort final checkpoint before suspending and closing.
|
||||
// If the caller requested removal, skip the final checkpoint so the file can be deleted.
|
||||
if (!remove_file)
|
||||
RecordCheckpoint(*buf, true);
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(buf);
|
||||
@@ -183,24 +298,162 @@ SwapManager::Detach(Buffer *buf)
|
||||
}
|
||||
}
|
||||
Flush(buf);
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(buf);
|
||||
if (it != journals_.end()) {
|
||||
close_ctx(it->second);
|
||||
journals_.erase(it);
|
||||
std::string path;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(buf);
|
||||
if (it != journals_.end()) {
|
||||
path = it->second.path;
|
||||
close_ctx(it->second);
|
||||
journals_.erase(it);
|
||||
}
|
||||
recorders_.erase(buf);
|
||||
}
|
||||
if (remove_file && !path.empty()) {
|
||||
(void) std::remove(path.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::ResetJournal(Buffer &buf)
|
||||
{
|
||||
std::string path;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
JournalCtx &ctx = it->second;
|
||||
if (ctx.path.empty())
|
||||
ctx.path = ComputeSidecarPath(buf);
|
||||
path = ctx.path;
|
||||
ctx.suspended = true;
|
||||
}
|
||||
|
||||
Flush(&buf);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
JournalCtx &ctx = it->second;
|
||||
close_ctx(ctx);
|
||||
ctx.header_ok = false;
|
||||
ctx.last_flush_ns = 0;
|
||||
ctx.last_fsync_ns = 0;
|
||||
ctx.last_chkpt_ns = 0;
|
||||
ctx.edit_bytes_since_chkpt = 0;
|
||||
ctx.approx_size_bytes = 0;
|
||||
ctx.suspended = false;
|
||||
}
|
||||
|
||||
if (!path.empty()) {
|
||||
(void) std::remove(path.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::string
|
||||
SwapManager::SwapDirRoot()
|
||||
{
|
||||
return swap_root_dir().string();
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::PruneSwapDir()
|
||||
{
|
||||
SwapConfig cfg;
|
||||
std::vector<std::string> active;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
cfg = cfg_;
|
||||
active.reserve(journals_.size());
|
||||
for (const auto &kv: journals_) {
|
||||
if (!kv.second.path.empty())
|
||||
active.push_back(kv.second.path);
|
||||
}
|
||||
}
|
||||
|
||||
const fs::path root = swap_root_dir();
|
||||
std::error_code ec;
|
||||
if (!fs::exists(root, ec) || ec)
|
||||
return;
|
||||
|
||||
struct Entry {
|
||||
fs::path path;
|
||||
std::filesystem::file_time_type mtime;
|
||||
};
|
||||
std::vector<Entry> swps;
|
||||
for (auto it = fs::directory_iterator(root, ec); !ec && it != fs::directory_iterator(); it.increment(ec)) {
|
||||
const fs::path p = it->path();
|
||||
if (p.extension() != ".swp")
|
||||
continue;
|
||||
// Never delete active journals.
|
||||
const std::string ps = p.string();
|
||||
bool is_active = false;
|
||||
for (const auto &a: active) {
|
||||
if (a == ps) {
|
||||
is_active = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (is_active)
|
||||
continue;
|
||||
std::error_code ec2;
|
||||
if (!it->is_regular_file(ec2) || ec2)
|
||||
continue;
|
||||
auto tm = fs::last_write_time(p, ec2);
|
||||
if (ec2)
|
||||
continue;
|
||||
swps.push_back({p, tm});
|
||||
}
|
||||
|
||||
if (swps.empty())
|
||||
return;
|
||||
|
||||
// Sort newest first.
|
||||
std::sort(swps.begin(), swps.end(), [](const Entry &a, const Entry &b) {
|
||||
return a.mtime > b.mtime;
|
||||
});
|
||||
|
||||
// Convert age threshold.
|
||||
auto now = std::filesystem::file_time_type::clock::now();
|
||||
auto max_age = std::chrono::hours(24) * static_cast<long long>(cfg.prune_max_age_days);
|
||||
|
||||
std::size_t kept = 0;
|
||||
for (const auto &e: swps) {
|
||||
bool too_old = false;
|
||||
if (cfg.prune_max_age_days > 0) {
|
||||
// If file_time_type isn't system_clock, duration arithmetic still works.
|
||||
if (now - e.mtime > max_age)
|
||||
too_old = true;
|
||||
}
|
||||
bool over_limit = (cfg.prune_max_files > 0) && (kept >= cfg.prune_max_files);
|
||||
if (too_old || over_limit) {
|
||||
std::error_code ec3;
|
||||
fs::remove(e.path, ec3);
|
||||
} else {
|
||||
++kept;
|
||||
}
|
||||
}
|
||||
recorders_.erase(buf);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::NotifyFilenameChanged(Buffer &buf)
|
||||
{
|
||||
// Best-effort: checkpoint the old journal before switching paths.
|
||||
RecordCheckpoint(buf, true);
|
||||
std::string old_path;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
old_path = it->second.path;
|
||||
it->second.suspended = true;
|
||||
}
|
||||
Flush(&buf);
|
||||
@@ -210,8 +463,16 @@ SwapManager::NotifyFilenameChanged(Buffer &buf)
|
||||
return;
|
||||
JournalCtx &ctx = it->second;
|
||||
close_ctx(ctx);
|
||||
ctx.path = ComputeSidecarPath(buf);
|
||||
ctx.suspended = false;
|
||||
if (!old_path.empty())
|
||||
(void) std::remove(old_path.c_str());
|
||||
ctx.path = ComputeSidecarPath(buf);
|
||||
ctx.suspended = false;
|
||||
ctx.header_ok = false;
|
||||
ctx.last_flush_ns = 0;
|
||||
ctx.last_fsync_ns = 0;
|
||||
ctx.last_chkpt_ns = 0;
|
||||
ctx.edit_bytes_since_chkpt = 0;
|
||||
ctx.approx_size_bytes = 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -257,54 +518,9 @@ SwapManager::SuspendGuard::~SuspendGuard()
|
||||
std::string
|
||||
SwapManager::ComputeSidecarPath(const Buffer &buf)
|
||||
{
|
||||
// Always place swap under an XDG home-appropriate state directory.
|
||||
// This avoids cluttering working directories and prevents stomping on
|
||||
// swap files when multiple different paths share the same basename.
|
||||
fs::path root = xdg_state_home() / "kte" / "swap";
|
||||
|
||||
auto encode_path = [](std::string s) -> std::string {
|
||||
// Turn an absolute path like "/home/kyle/tmp/test.txt" into
|
||||
// "home!kyle!tmp!test.txt" so swap files are human-identifiable.
|
||||
//
|
||||
// Notes:
|
||||
// - We strip a single leading path separator so absolute paths don't start with '!'.
|
||||
// - We replace both '/' and '\\' with '!'.
|
||||
// - We leave other characters as-is (spaces are OK on POSIX).
|
||||
if (!s.empty() && (s[0] == '/' || s[0] == '\\'))
|
||||
s.erase(0, 1);
|
||||
for (char &ch: s) {
|
||||
if (ch == '/' || ch == '\\')
|
||||
ch = '!';
|
||||
}
|
||||
return s;
|
||||
};
|
||||
|
||||
fs::path root = swap_root_dir();
|
||||
if (!buf.Filename().empty()) {
|
||||
fs::path p(buf.Filename());
|
||||
std::string key;
|
||||
try {
|
||||
key = fs::weakly_canonical(p).string();
|
||||
} catch (...) {
|
||||
try {
|
||||
key = fs::absolute(p).string();
|
||||
} catch (...) {
|
||||
key = buf.Filename();
|
||||
}
|
||||
}
|
||||
std::string encoded = encode_path(key);
|
||||
if (!encoded.empty()) {
|
||||
std::string name = encoded + ".swp";
|
||||
// Avoid filesystem/path length issues; fall back to hashed naming.
|
||||
// NAME_MAX is often 255 on POSIX, but keep extra headroom.
|
||||
if (name.size() <= 200) {
|
||||
return (root / name).string();
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: stable, shorter name based on basename + hash.
|
||||
std::string base = p.filename().string();
|
||||
const std::string name = base + "." + hex_u64(fnv1a64(key)) + ".swp";
|
||||
return (root / name).string();
|
||||
return compute_swap_path_for_filename(buf.Filename());
|
||||
}
|
||||
|
||||
// Unnamed buffers: unique within the process.
|
||||
@@ -316,6 +532,20 @@ SwapManager::ComputeSidecarPath(const Buffer &buf)
|
||||
}
|
||||
|
||||
|
||||
std::string
|
||||
SwapManager::ComputeSwapPathForFilename(const std::string &filename)
|
||||
{
|
||||
return ComputeSidecarPathForFilename(filename);
|
||||
}
|
||||
|
||||
|
||||
std::string
|
||||
SwapManager::ComputeSidecarPathForFilename(const std::string &filename)
|
||||
{
|
||||
return compute_swap_path_for_filename(filename);
|
||||
}
|
||||
|
||||
|
||||
std::uint64_t
|
||||
SwapManager::now_ns()
|
||||
{
|
||||
@@ -402,9 +632,11 @@ SwapManager::open_ctx(JournalCtx &ctx, const std::string &path)
|
||||
ctx.fd = fd;
|
||||
ctx.path = path;
|
||||
if (st.st_size == 0) {
|
||||
ctx.header_ok = write_header(fd);
|
||||
ctx.header_ok = write_header(fd);
|
||||
ctx.approx_size_bytes = ctx.header_ok ? 64 : 0;
|
||||
} else {
|
||||
ctx.header_ok = true; // stage 1: trust existing header
|
||||
ctx.header_ok = true; // stage 1: trust existing header
|
||||
ctx.approx_size_bytes = static_cast<std::uint64_t>(st.st_size);
|
||||
}
|
||||
return ctx.header_ok;
|
||||
}
|
||||
@@ -422,6 +654,79 @@ SwapManager::close_ctx(JournalCtx &ctx)
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
SwapManager::compact_to_checkpoint(JournalCtx &ctx, const std::vector<std::uint8_t> &chkpt_record)
|
||||
{
|
||||
if (ctx.path.empty())
|
||||
return false;
|
||||
if (chkpt_record.empty())
|
||||
return false;
|
||||
|
||||
// Close existing file before rename.
|
||||
if (ctx.fd >= 0) {
|
||||
(void) ::fsync(ctx.fd);
|
||||
::close(ctx.fd);
|
||||
ctx.fd = -1;
|
||||
}
|
||||
ctx.header_ok = false;
|
||||
|
||||
const std::string tmp_path = ctx.path + ".tmp";
|
||||
// Create the compacted file: header + checkpoint record.
|
||||
if (!ensure_parent_dir(tmp_path))
|
||||
return false;
|
||||
|
||||
int flags = O_CREAT | O_WRONLY | O_TRUNC;
|
||||
#ifdef O_CLOEXEC
|
||||
flags |= O_CLOEXEC;
|
||||
#endif
|
||||
int tfd = ::open(tmp_path.c_str(), flags, 0600);
|
||||
if (tfd < 0)
|
||||
return false;
|
||||
(void) ::fchmod(tfd, 0600);
|
||||
bool ok = write_header(tfd);
|
||||
if (ok)
|
||||
ok = write_full(tfd, chkpt_record.data(), chkpt_record.size());
|
||||
if (ok)
|
||||
ok = (::fsync(tfd) == 0);
|
||||
::close(tfd);
|
||||
if (!ok) {
|
||||
std::remove(tmp_path.c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
// Atomic replace.
|
||||
if (::rename(tmp_path.c_str(), ctx.path.c_str()) != 0) {
|
||||
std::remove(tmp_path.c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
// Best-effort: fsync parent dir to persist the rename.
|
||||
try {
|
||||
fs::path p(ctx.path);
|
||||
fs::path dir = p.parent_path();
|
||||
if (!dir.empty()) {
|
||||
int dflags = O_RDONLY;
|
||||
#ifdef O_DIRECTORY
|
||||
dflags |= O_DIRECTORY;
|
||||
#endif
|
||||
int dfd = ::open(dir.string().c_str(), dflags);
|
||||
if (dfd >= 0) {
|
||||
(void) ::fsync(dfd);
|
||||
::close(dfd);
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
// ignore
|
||||
}
|
||||
|
||||
// Re-open for further appends.
|
||||
if (!open_ctx(ctx, ctx.path))
|
||||
return false;
|
||||
ctx.approx_size_bytes = 64 + static_cast<std::uint64_t>(chkpt_record.size());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
std::uint32_t
|
||||
SwapManager::crc32(const std::uint8_t *data, std::size_t len, std::uint32_t seed)
|
||||
{
|
||||
@@ -510,6 +815,7 @@ SwapManager::RecordInsert(Buffer &buf, int row, int col, std::string_view text)
|
||||
p.payload.insert(p.payload.end(), reinterpret_cast<const std::uint8_t *>(text.data()),
|
||||
reinterpret_cast<const std::uint8_t *>(text.data()) + text.size());
|
||||
enqueue(std::move(p));
|
||||
maybe_request_checkpoint(buf, text.size());
|
||||
}
|
||||
|
||||
|
||||
@@ -533,6 +839,7 @@ SwapManager::RecordDelete(Buffer &buf, int row, int col, std::size_t len)
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, col)));
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(len));
|
||||
enqueue(std::move(p));
|
||||
maybe_request_checkpoint(buf, len);
|
||||
}
|
||||
|
||||
|
||||
@@ -553,6 +860,7 @@ SwapManager::RecordSplit(Buffer &buf, int row, int col)
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, row)));
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, col)));
|
||||
enqueue(std::move(p));
|
||||
maybe_request_checkpoint(buf, 1);
|
||||
}
|
||||
|
||||
|
||||
@@ -572,6 +880,68 @@ SwapManager::RecordJoin(Buffer &buf, int row)
|
||||
p.payload.push_back(1);
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, row)));
|
||||
enqueue(std::move(p));
|
||||
maybe_request_checkpoint(buf, 1);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::maybe_request_checkpoint(Buffer &buf, const std::size_t approx_edit_bytes)
|
||||
{
|
||||
SwapConfig cfg;
|
||||
bool do_chkpt = false;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
cfg = cfg_;
|
||||
if (cfg.checkpoint_bytes == 0 && cfg.checkpoint_interval_ms == 0)
|
||||
return;
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end() || it->second.suspended)
|
||||
return;
|
||||
JournalCtx &ctx = it->second;
|
||||
ctx.edit_bytes_since_chkpt += approx_edit_bytes;
|
||||
const std::uint64_t now = now_ns();
|
||||
if (ctx.last_chkpt_ns == 0)
|
||||
ctx.last_chkpt_ns = now;
|
||||
const bool bytes_hit = (cfg.checkpoint_bytes > 0) && (
|
||||
ctx.edit_bytes_since_chkpt >= cfg.checkpoint_bytes);
|
||||
const bool time_hit = (cfg.checkpoint_interval_ms > 0) &&
|
||||
(((now - ctx.last_chkpt_ns) / 1000000ULL) >= cfg.checkpoint_interval_ms);
|
||||
if (bytes_hit || time_hit) {
|
||||
ctx.edit_bytes_since_chkpt = 0;
|
||||
ctx.last_chkpt_ns = now;
|
||||
do_chkpt = true;
|
||||
}
|
||||
}
|
||||
if (do_chkpt) {
|
||||
RecordCheckpoint(buf, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::RecordCheckpoint(Buffer &buf, const bool urgent_flush)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end() || it->second.suspended)
|
||||
return;
|
||||
}
|
||||
|
||||
const std::string bytes = snapshot_buffer_bytes(buf);
|
||||
if (bytes.size() > 0xFFFFFFFFu)
|
||||
return;
|
||||
|
||||
Pending p;
|
||||
p.buf = &buf;
|
||||
p.type = SwapRecType::CHKPT;
|
||||
p.urgent_flush = urgent_flush;
|
||||
// payload v1: [encver u8=1][nbytes u32][bytes]
|
||||
p.payload.push_back(1);
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(bytes.size()));
|
||||
p.payload.insert(p.payload.end(), reinterpret_cast<const std::uint8_t *>(bytes.data()),
|
||||
reinterpret_cast<const std::uint8_t *>(bytes.data()) + bytes.size());
|
||||
enqueue(std::move(p));
|
||||
}
|
||||
|
||||
|
||||
@@ -641,17 +1011,17 @@ SwapManager::process_one(const Pending &p)
|
||||
|
||||
JournalCtx *ctxp = nullptr;
|
||||
std::string path;
|
||||
std::size_t compact_bytes = 0;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(p.buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
if (it->second.suspended)
|
||||
return;
|
||||
if (it->second.path.empty())
|
||||
it->second.path = ComputeSidecarPath(buf);
|
||||
path = it->second.path;
|
||||
ctxp = &it->second;
|
||||
path = it->second.path;
|
||||
ctxp = &it->second;
|
||||
compact_bytes = cfg_.compact_bytes;
|
||||
}
|
||||
if (!ctxp)
|
||||
return;
|
||||
@@ -680,13 +1050,27 @@ SwapManager::process_one(const Pending &p)
|
||||
crcbytes[2] = static_cast<std::uint8_t>((c >> 16) & 0xFFu);
|
||||
crcbytes[3] = static_cast<std::uint8_t>((c >> 24) & 0xFFu);
|
||||
|
||||
std::vector<std::uint8_t> rec;
|
||||
rec.reserve(sizeof(head) + p.payload.size() + sizeof(crcbytes));
|
||||
rec.insert(rec.end(), head, head + sizeof(head));
|
||||
if (!p.payload.empty())
|
||||
rec.insert(rec.end(), p.payload.begin(), p.payload.end());
|
||||
rec.insert(rec.end(), crcbytes, crcbytes + sizeof(crcbytes));
|
||||
|
||||
// Write (handle partial writes and check results)
|
||||
bool ok = write_full(ctxp->fd, head, sizeof(head));
|
||||
if (ok && !p.payload.empty())
|
||||
ok = write_full(ctxp->fd, p.payload.data(), p.payload.size());
|
||||
if (ok)
|
||||
ok = write_full(ctxp->fd, crcbytes, sizeof(crcbytes));
|
||||
(void) ok; // stage 1: best-effort; future work could mark ctx error state
|
||||
bool ok = write_full(ctxp->fd, rec.data(), rec.size());
|
||||
if (ok) {
|
||||
ctxp->approx_size_bytes += static_cast<std::uint64_t>(rec.size());
|
||||
if (p.urgent_flush) {
|
||||
(void) ::fsync(ctxp->fd);
|
||||
ctxp->last_fsync_ns = now_ns();
|
||||
}
|
||||
if (p.type == SwapRecType::CHKPT && compact_bytes > 0 &&
|
||||
ctxp->approx_size_bytes >= static_cast<std::uint64_t>(compact_bytes)) {
|
||||
(void) compact_to_checkpoint(*ctxp, rec);
|
||||
}
|
||||
}
|
||||
(void) ok; // best-effort; future work could mark ctx error state
|
||||
}
|
||||
|
||||
|
||||
@@ -743,6 +1127,20 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
|
||||
return false;
|
||||
}
|
||||
|
||||
// Ensure replayed edits don't get re-journaled if the caller forgot to detach/suspend.
|
||||
kte::SwapRecorder *prev_rec = buf.SwapRecorder();
|
||||
buf.SetSwapRecorder(nullptr);
|
||||
struct RestoreSwapRecorder {
|
||||
Buffer &b;
|
||||
kte::SwapRecorder *prev;
|
||||
|
||||
|
||||
~RestoreSwapRecorder()
|
||||
{
|
||||
b.SetSwapRecorder(prev);
|
||||
}
|
||||
} restore{buf, prev_rec};
|
||||
|
||||
for (;;) {
|
||||
std::uint8_t head[4];
|
||||
in.read(reinterpret_cast<char *>(head), sizeof(head));
|
||||
@@ -780,18 +1178,18 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
|
||||
}
|
||||
|
||||
// Apply record
|
||||
std::size_t off = 0;
|
||||
if (payload.empty()) {
|
||||
err = "Swap record missing payload";
|
||||
return false;
|
||||
}
|
||||
const std::uint8_t encver = payload[off++];
|
||||
if (encver != 1) {
|
||||
err = "Unsupported swap payload encoding";
|
||||
return false;
|
||||
}
|
||||
switch (type) {
|
||||
case SwapRecType::INS: {
|
||||
std::size_t off = 0;
|
||||
if (payload.empty()) {
|
||||
err = "Swap record missing INS payload";
|
||||
return false;
|
||||
}
|
||||
const std::uint8_t encver = payload[off++];
|
||||
if (encver != 1) {
|
||||
err = "Unsupported swap payload encoding";
|
||||
return false;
|
||||
}
|
||||
std::uint32_t row = 0, col = 0, nbytes = 0;
|
||||
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col) || !parse_u32_le(
|
||||
payload, off, nbytes)) {
|
||||
@@ -807,6 +1205,16 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
|
||||
break;
|
||||
}
|
||||
case SwapRecType::DEL: {
|
||||
std::size_t off = 0;
|
||||
if (payload.empty()) {
|
||||
err = "Swap record missing DEL payload";
|
||||
return false;
|
||||
}
|
||||
const std::uint8_t encver = payload[off++];
|
||||
if (encver != 1) {
|
||||
err = "Unsupported swap payload encoding";
|
||||
return false;
|
||||
}
|
||||
std::uint32_t row = 0, col = 0, dlen = 0;
|
||||
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col) || !parse_u32_le(
|
||||
payload, off, dlen)) {
|
||||
@@ -817,6 +1225,16 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
|
||||
break;
|
||||
}
|
||||
case SwapRecType::SPLIT: {
|
||||
std::size_t off = 0;
|
||||
if (payload.empty()) {
|
||||
err = "Swap record missing SPLIT payload";
|
||||
return false;
|
||||
}
|
||||
const std::uint8_t encver = payload[off++];
|
||||
if (encver != 1) {
|
||||
err = "Unsupported swap payload encoding";
|
||||
return false;
|
||||
}
|
||||
std::uint32_t row = 0, col = 0;
|
||||
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col)) {
|
||||
err = "Malformed SPLIT payload";
|
||||
@@ -826,6 +1244,16 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
|
||||
break;
|
||||
}
|
||||
case SwapRecType::JOIN: {
|
||||
std::size_t off = 0;
|
||||
if (payload.empty()) {
|
||||
err = "Swap record missing JOIN payload";
|
||||
return false;
|
||||
}
|
||||
const std::uint8_t encver = payload[off++];
|
||||
if (encver != 1) {
|
||||
err = "Unsupported swap payload encoding";
|
||||
return false;
|
||||
}
|
||||
std::uint32_t row = 0;
|
||||
if (!parse_u32_le(payload, off, row)) {
|
||||
err = "Malformed JOIN payload";
|
||||
@@ -834,8 +1262,32 @@ SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &
|
||||
buf.join_lines((int) row);
|
||||
break;
|
||||
}
|
||||
case SwapRecType::CHKPT: {
|
||||
std::size_t off = 0;
|
||||
if (payload.size() < 5) {
|
||||
err = "Malformed CHKPT payload";
|
||||
return false;
|
||||
}
|
||||
const std::uint8_t encver = payload[off++];
|
||||
if (encver != 1) {
|
||||
err = "Unsupported swap checkpoint encoding";
|
||||
return false;
|
||||
}
|
||||
std::uint32_t nbytes = 0;
|
||||
if (!parse_u32_le(payload, off, nbytes)) {
|
||||
err = "Malformed CHKPT payload";
|
||||
return false;
|
||||
}
|
||||
if (off + nbytes > payload.size()) {
|
||||
err = "Truncated CHKPT payload bytes";
|
||||
return false;
|
||||
}
|
||||
buf.replace_all_bytes(std::string_view(reinterpret_cast<const char *>(payload.data() + off),
|
||||
(std::size_t) nbytes));
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// Ignore unknown types for forward-compat in stage 1
|
||||
// Ignore unknown types for forward-compat
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user