Add swap journaling and group undo/redo with extensive tests.
- Introduced SwapManager for sidecar journaling of buffer mutations, with a safe recovery mechanism. - Added group undo/redo functionality, allowing atomic grouping of related edits. - Implemented `SwapRecorder` and integrated it as a callback interface for mutations. - Added unit tests for swap journaling (save/load/replay) and undo grouping. - Refactored undo to support group tracking and ID management. - Updated CMake to include the new tests and swap journaling logic.
This commit is contained in:
659
Swap.cc
659
Swap.cc
@@ -5,6 +5,9 @@
|
||||
#include <chrono>
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include <ctime>
|
||||
#include <cstdlib>
|
||||
#include <fstream>
|
||||
#include <filesystem>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
@@ -18,23 +21,66 @@ namespace {
|
||||
constexpr std::uint8_t MAGIC[8] = {'K', 'T', 'E', '_', 'S', 'W', 'P', '\0'};
|
||||
constexpr std::uint32_t VERSION = 1;
|
||||
|
||||
// Write all bytes in buf to fd, handling EINTR and partial writes.
|
||||
static bool write_full(int fd, const void *buf, size_t len)
|
||||
|
||||
static fs::path
|
||||
xdg_state_home()
|
||||
{
|
||||
const std::uint8_t *p = static_cast<const std::uint8_t *>(buf);
|
||||
while (len > 0) {
|
||||
ssize_t n = ::write(fd, p, len);
|
||||
if (n < 0) {
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
return false;
|
||||
}
|
||||
if (n == 0)
|
||||
return false; // shouldn't happen for regular files; treat as error
|
||||
p += static_cast<size_t>(n);
|
||||
len -= static_cast<size_t>(n);
|
||||
}
|
||||
return true;
|
||||
if (const char *p = std::getenv("XDG_STATE_HOME")) {
|
||||
if (*p)
|
||||
return fs::path(p);
|
||||
}
|
||||
if (const char *home = std::getenv("HOME")) {
|
||||
if (*home)
|
||||
return fs::path(home) / ".local" / "state";
|
||||
}
|
||||
// Last resort: still provide a stable per-user-ish location.
|
||||
return fs::temp_directory_path() / "kte" / "state";
|
||||
}
|
||||
|
||||
|
||||
static std::uint64_t
|
||||
fnv1a64(std::string_view s)
|
||||
{
|
||||
std::uint64_t h = 14695981039346656037ULL;
|
||||
for (unsigned char ch: s) {
|
||||
h ^= (std::uint64_t) ch;
|
||||
h *= 1099511628211ULL;
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
||||
|
||||
static std::string
|
||||
hex_u64(std::uint64_t v)
|
||||
{
|
||||
static const char *kHex = "0123456789abcdef";
|
||||
char out[16];
|
||||
for (int i = 15; i >= 0; --i) {
|
||||
out[i] = kHex[v & 0xFULL];
|
||||
v >>= 4;
|
||||
}
|
||||
return std::string(out, sizeof(out));
|
||||
}
|
||||
|
||||
|
||||
// Write all bytes in buf to fd, handling EINTR and partial writes.
|
||||
static bool
|
||||
write_full(int fd, const void *buf, size_t len)
|
||||
{
|
||||
const std::uint8_t *p = static_cast<const std::uint8_t *>(buf);
|
||||
while (len > 0) {
|
||||
ssize_t n = ::write(fd, p, len);
|
||||
if (n < 0) {
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
return false;
|
||||
}
|
||||
if (n == 0)
|
||||
return false; // shouldn't happen for regular files; treat as error
|
||||
p += static_cast<size_t>(n);
|
||||
len -= static_cast<size_t>(n);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +96,8 @@ SwapManager::SwapManager()
|
||||
|
||||
SwapManager::~SwapManager()
|
||||
{
|
||||
// Best-effort: drain queued records before stopping the writer.
|
||||
Flush();
|
||||
running_.store(false);
|
||||
cv_.notify_all();
|
||||
if (worker_.joinable())
|
||||
@@ -62,30 +110,108 @@ SwapManager::~SwapManager()
|
||||
|
||||
|
||||
void
|
||||
SwapManager::Attach(Buffer * /*buf*/)
|
||||
SwapManager::Flush(Buffer *buf)
|
||||
{
|
||||
// Stage 1: lazy-open on first record; nothing to do here.
|
||||
(void) buf; // stage 1: flushes all buffers
|
||||
std::unique_lock<std::mutex> lk(mtx_);
|
||||
const std::uint64_t target = next_seq_;
|
||||
// Wake the writer in case it's waiting on the interval.
|
||||
cv_.notify_one();
|
||||
cv_.wait(lk, [&] {
|
||||
return queue_.empty() && inflight_ == 0 && last_processed_ >= target;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::Detach(Buffer * /*buf*/)
|
||||
SwapManager::BufferRecorder::OnInsert(int row, int col, std::string_view bytes)
|
||||
{
|
||||
// Stage 1: keep files open until manager destruction; future work can close per-buffer.
|
||||
m_.RecordInsert(buf_, row, col, bytes);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::BufferRecorder::OnDelete(int row, int col, std::size_t len)
|
||||
{
|
||||
m_.RecordDelete(buf_, row, col, len);
|
||||
}
|
||||
|
||||
|
||||
SwapRecorder *
|
||||
SwapManager::RecorderFor(Buffer *buf)
|
||||
{
|
||||
if (!buf)
|
||||
return nullptr;
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = recorders_.find(buf);
|
||||
if (it != recorders_.end())
|
||||
return it->second.get();
|
||||
// Create on-demand. Recording calls will no-op until Attach() has been called.
|
||||
auto rec = std::make_unique<BufferRecorder>(*this, *buf);
|
||||
SwapRecorder *ptr = rec.get();
|
||||
recorders_[buf] = std::move(rec);
|
||||
return ptr;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::Attach(Buffer *buf)
|
||||
{
|
||||
if (!buf)
|
||||
return;
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
JournalCtx &ctx = journals_[buf];
|
||||
if (ctx.path.empty())
|
||||
ctx.path = ComputeSidecarPath(*buf);
|
||||
// Ensure a recorder exists as well.
|
||||
if (recorders_.find(buf) == recorders_.end()) {
|
||||
recorders_[buf] = std::make_unique<BufferRecorder>(*this, *buf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::Detach(Buffer *buf)
|
||||
{
|
||||
if (!buf)
|
||||
return;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(buf);
|
||||
if (it != journals_.end()) {
|
||||
it->second.suspended = true;
|
||||
}
|
||||
}
|
||||
Flush(buf);
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(buf);
|
||||
if (it != journals_.end()) {
|
||||
close_ctx(it->second);
|
||||
journals_.erase(it);
|
||||
}
|
||||
recorders_.erase(buf);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::NotifyFilenameChanged(Buffer &buf)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
it->second.suspended = true;
|
||||
}
|
||||
Flush(&buf);
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
JournalCtx &ctx = it->second;
|
||||
// Close existing file handle, update path; lazily reopen on next write
|
||||
close_ctx(ctx);
|
||||
ctx.path = ComputeSidecarPath(buf);
|
||||
ctx.path = ComputeSidecarPath(buf);
|
||||
ctx.suspended = false;
|
||||
}
|
||||
|
||||
|
||||
@@ -93,47 +219,100 @@ void
|
||||
SwapManager::SetSuspended(Buffer &buf, bool on)
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto path = ComputeSidecarPath(buf);
|
||||
// Create/update context for this buffer
|
||||
JournalCtx &ctx = journals_[&buf];
|
||||
ctx.path = path;
|
||||
ctx.suspended = on;
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
it->second.suspended = on;
|
||||
}
|
||||
|
||||
|
||||
SwapManager::SuspendGuard::SuspendGuard(SwapManager &m, Buffer *b)
|
||||
: m_(m), buf_(b), prev_(false)
|
||||
{
|
||||
// Suspend recording while guard is alive
|
||||
if (buf_)
|
||||
m_.SetSuspended(*buf_, true);
|
||||
if (!buf_)
|
||||
return;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(m_.mtx_);
|
||||
auto it = m_.journals_.find(buf_);
|
||||
if (it != m_.journals_.end()) {
|
||||
prev_ = it->second.suspended;
|
||||
it->second.suspended = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SwapManager::SuspendGuard::~SuspendGuard()
|
||||
{
|
||||
if (buf_)
|
||||
m_.SetSuspended(*buf_, false);
|
||||
if (!buf_)
|
||||
return;
|
||||
std::lock_guard<std::mutex> lg(m_.mtx_);
|
||||
auto it = m_.journals_.find(buf_);
|
||||
if (it != m_.journals_.end()) {
|
||||
it->second.suspended = prev_;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
std::string
|
||||
SwapManager::ComputeSidecarPath(const Buffer &buf)
|
||||
{
|
||||
if (buf.IsFileBacked() || !buf.Filename().empty()) {
|
||||
// Always place swap under an XDG home-appropriate state directory.
|
||||
// This avoids cluttering working directories and prevents stomping on
|
||||
// swap files when multiple different paths share the same basename.
|
||||
fs::path root = xdg_state_home() / "kte" / "swap";
|
||||
|
||||
auto encode_path = [](std::string s) -> std::string {
|
||||
// Turn an absolute path like "/home/kyle/tmp/test.txt" into
|
||||
// "home!kyle!tmp!test.txt" so swap files are human-identifiable.
|
||||
//
|
||||
// Notes:
|
||||
// - We strip a single leading path separator so absolute paths don't start with '!'.
|
||||
// - We replace both '/' and '\\' with '!'.
|
||||
// - We leave other characters as-is (spaces are OK on POSIX).
|
||||
if (!s.empty() && (s[0] == '/' || s[0] == '\\'))
|
||||
s.erase(0, 1);
|
||||
for (char &ch: s) {
|
||||
if (ch == '/' || ch == '\\')
|
||||
ch = '!';
|
||||
}
|
||||
return s;
|
||||
};
|
||||
|
||||
if (!buf.Filename().empty()) {
|
||||
fs::path p(buf.Filename());
|
||||
fs::path dir = p.parent_path();
|
||||
std::string base = p.filename().string();
|
||||
std::string side = "." + base + ".kte.swp";
|
||||
return (dir / side).string();
|
||||
std::string key;
|
||||
try {
|
||||
key = fs::weakly_canonical(p).string();
|
||||
} catch (...) {
|
||||
try {
|
||||
key = fs::absolute(p).string();
|
||||
} catch (...) {
|
||||
key = buf.Filename();
|
||||
}
|
||||
}
|
||||
std::string encoded = encode_path(key);
|
||||
if (!encoded.empty()) {
|
||||
std::string name = encoded + ".swp";
|
||||
// Avoid filesystem/path length issues; fall back to hashed naming.
|
||||
// NAME_MAX is often 255 on POSIX, but keep extra headroom.
|
||||
if (name.size() <= 200) {
|
||||
return (root / name).string();
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: stable, shorter name based on basename + hash.
|
||||
std::string base = p.filename().string();
|
||||
const std::string name = base + "." + hex_u64(fnv1a64(key)) + ".swp";
|
||||
return (root / name).string();
|
||||
}
|
||||
// unnamed: $TMPDIR/kte/unnamed-<ptr>.kte.swp (best-effort)
|
||||
const char *tmp = std::getenv("TMPDIR");
|
||||
fs::path t = tmp ? fs::path(tmp) : fs::temp_directory_path();
|
||||
fs::path d = t / "kte";
|
||||
char bufptr[32];
|
||||
std::snprintf(bufptr, sizeof(bufptr), "%p", (const void *) &buf);
|
||||
return (d / (std::string("unnamed-") + bufptr + ".kte.swp")).string();
|
||||
|
||||
// Unnamed buffers: unique within the process.
|
||||
static std::atomic<std::uint64_t> ctr{0};
|
||||
const std::uint64_t n = ++ctr;
|
||||
const int pid = (int) ::getpid();
|
||||
const std::string name = "unnamed-" + std::to_string(pid) + "-" + std::to_string(n) + ".swp";
|
||||
return (root / name).string();
|
||||
}
|
||||
|
||||
|
||||
@@ -163,53 +342,69 @@ SwapManager::ensure_parent_dir(const std::string &path)
|
||||
|
||||
|
||||
bool
|
||||
SwapManager::write_header(JournalCtx &ctx)
|
||||
SwapManager::write_header(int fd)
|
||||
{
|
||||
if (ctx.fd < 0)
|
||||
if (fd < 0)
|
||||
return false;
|
||||
// Write a simple 64-byte header
|
||||
// Fixed 64-byte header (v1)
|
||||
// [magic 8][version u32][flags u32][created_time u64][reserved/padding]
|
||||
std::uint8_t hdr[64];
|
||||
std::memset(hdr, 0, sizeof(hdr));
|
||||
std::memcpy(hdr, MAGIC, 8);
|
||||
std::uint32_t ver = VERSION;
|
||||
std::memcpy(hdr + 8, &ver, sizeof(ver));
|
||||
// version (little-endian)
|
||||
hdr[8] = static_cast<std::uint8_t>(VERSION & 0xFFu);
|
||||
hdr[9] = static_cast<std::uint8_t>((VERSION >> 8) & 0xFFu);
|
||||
hdr[10] = static_cast<std::uint8_t>((VERSION >> 16) & 0xFFu);
|
||||
hdr[11] = static_cast<std::uint8_t>((VERSION >> 24) & 0xFFu);
|
||||
// flags = 0
|
||||
// created_time (unix seconds; little-endian)
|
||||
std::uint64_t ts = static_cast<std::uint64_t>(std::time(nullptr));
|
||||
std::memcpy(hdr + 16, &ts, sizeof(ts));
|
||||
ssize_t w = ::write(ctx.fd, hdr, sizeof(hdr));
|
||||
return (w == (ssize_t) sizeof(hdr));
|
||||
put_le64(hdr + 16, ts);
|
||||
return write_full(fd, hdr, sizeof(hdr));
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
SwapManager::open_ctx(JournalCtx &ctx)
|
||||
SwapManager::open_ctx(JournalCtx &ctx, const std::string &path)
|
||||
{
|
||||
if (ctx.fd >= 0)
|
||||
return true;
|
||||
if (!ensure_parent_dir(ctx.path))
|
||||
if (!ensure_parent_dir(path))
|
||||
return false;
|
||||
// Create or open with 0600 perms
|
||||
int fd = ::open(ctx.path.c_str(), O_CREAT | O_RDWR, 0600);
|
||||
int flags = O_CREAT | O_WRONLY | O_APPEND;
|
||||
#ifdef O_CLOEXEC
|
||||
flags |= O_CLOEXEC;
|
||||
#endif
|
||||
int fd = ::open(path.c_str(), flags, 0600);
|
||||
if (fd < 0)
|
||||
return false;
|
||||
// Detect if file is new/empty to write header
|
||||
// Ensure permissions even if file already existed.
|
||||
(void) ::fchmod(fd, 0600);
|
||||
struct stat st{};
|
||||
if (fstat(fd, &st) != 0) {
|
||||
::close(fd);
|
||||
return false;
|
||||
}
|
||||
ctx.fd = fd;
|
||||
ctx.file = fdopen(fd, "ab");
|
||||
if (!ctx.file) {
|
||||
// If an existing file is too small to contain the fixed header, truncate
|
||||
// and restart.
|
||||
if (st.st_size > 0 && st.st_size < 64) {
|
||||
::close(fd);
|
||||
ctx.fd = -1;
|
||||
return false;
|
||||
int tflags = O_CREAT | O_WRONLY | O_TRUNC | O_APPEND;
|
||||
#ifdef O_CLOEXEC
|
||||
tflags |= O_CLOEXEC;
|
||||
#endif
|
||||
fd = ::open(path.c_str(), tflags, 0600);
|
||||
if (fd < 0)
|
||||
return false;
|
||||
(void) ::fchmod(fd, 0600);
|
||||
st.st_size = 0;
|
||||
}
|
||||
ctx.fd = fd;
|
||||
ctx.path = path;
|
||||
if (st.st_size == 0) {
|
||||
ctx.header_ok = write_header(ctx);
|
||||
ctx.header_ok = write_header(fd);
|
||||
} else {
|
||||
ctx.header_ok = true; // trust existing file for stage 1
|
||||
// Seek to end to append
|
||||
::lseek(ctx.fd, 0, SEEK_END);
|
||||
ctx.header_ok = true; // stage 1: trust existing header
|
||||
}
|
||||
return ctx.header_ok;
|
||||
}
|
||||
@@ -218,16 +413,12 @@ SwapManager::open_ctx(JournalCtx &ctx)
|
||||
void
|
||||
SwapManager::close_ctx(JournalCtx &ctx)
|
||||
{
|
||||
if (ctx.file) {
|
||||
std::fflush((FILE *) ctx.file);
|
||||
::fsync(ctx.fd);
|
||||
std::fclose((FILE *) ctx.file);
|
||||
ctx.file = nullptr;
|
||||
}
|
||||
if (ctx.fd >= 0) {
|
||||
(void) ::fsync(ctx.fd);
|
||||
::close(ctx.fd);
|
||||
ctx.fd = -1;
|
||||
}
|
||||
ctx.header_ok = false;
|
||||
}
|
||||
|
||||
|
||||
@@ -240,35 +431,48 @@ SwapManager::crc32(const std::uint8_t *data, std::size_t len, std::uint32_t seed
|
||||
for (std::uint32_t i = 0; i < 256; ++i) {
|
||||
std::uint32_t c = i;
|
||||
for (int j = 0; j < 8; ++j)
|
||||
c = (c & 1) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
|
||||
c = (c & 1) ? (0xEDB88320u ^ (c >> 1)) : (c >> 1);
|
||||
table[i] = c;
|
||||
}
|
||||
inited = true;
|
||||
}
|
||||
std::uint32_t c = ~seed;
|
||||
for (std::size_t i = 0; i < len; ++i)
|
||||
c = table[(c ^ data[i]) & 0xFFu] ^ (c >> 8);
|
||||
c = table[(c ^ data[i]) & 0xFFu] ^ (c >> 8);
|
||||
return ~c;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::put_varu64(std::vector<std::uint8_t> &out, std::uint64_t v)
|
||||
SwapManager::put_le32(std::vector<std::uint8_t> &out, std::uint32_t v)
|
||||
{
|
||||
while (v >= 0x80) {
|
||||
out.push_back(static_cast<std::uint8_t>(v) | 0x80);
|
||||
v >>= 7;
|
||||
}
|
||||
out.push_back(static_cast<std::uint8_t>(v));
|
||||
out.push_back(static_cast<std::uint8_t>(v & 0xFFu));
|
||||
out.push_back(static_cast<std::uint8_t>((v >> 8) & 0xFFu));
|
||||
out.push_back(static_cast<std::uint8_t>((v >> 16) & 0xFFu));
|
||||
out.push_back(static_cast<std::uint8_t>((v >> 24) & 0xFFu));
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::put_u24(std::uint8_t dst[3], std::uint32_t v)
|
||||
SwapManager::put_le64(std::uint8_t *dst, std::uint64_t v)
|
||||
{
|
||||
dst[0] = static_cast<std::uint8_t>((v >> 16) & 0xFF);
|
||||
dst[1] = static_cast<std::uint8_t>((v >> 8) & 0xFF);
|
||||
dst[2] = static_cast<std::uint8_t>(v & 0xFF);
|
||||
dst[0] = static_cast<std::uint8_t>(v & 0xFFu);
|
||||
dst[1] = static_cast<std::uint8_t>((v >> 8) & 0xFFu);
|
||||
dst[2] = static_cast<std::uint8_t>((v >> 16) & 0xFFu);
|
||||
dst[3] = static_cast<std::uint8_t>((v >> 24) & 0xFFu);
|
||||
dst[4] = static_cast<std::uint8_t>((v >> 32) & 0xFFu);
|
||||
dst[5] = static_cast<std::uint8_t>((v >> 40) & 0xFFu);
|
||||
dst[6] = static_cast<std::uint8_t>((v >> 48) & 0xFFu);
|
||||
dst[7] = static_cast<std::uint8_t>((v >> 56) & 0xFFu);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::put_u24_le(std::uint8_t dst[3], std::uint32_t v)
|
||||
{
|
||||
dst[0] = static_cast<std::uint8_t>(v & 0xFFu);
|
||||
dst[1] = static_cast<std::uint8_t>((v >> 8) & 0xFFu);
|
||||
dst[2] = static_cast<std::uint8_t>((v >> 16) & 0xFFu);
|
||||
}
|
||||
|
||||
|
||||
@@ -277,6 +481,7 @@ SwapManager::enqueue(Pending &&p)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
p.seq = ++next_seq_;
|
||||
queue_.emplace_back(std::move(p));
|
||||
}
|
||||
cv_.notify_one();
|
||||
@@ -288,16 +493,20 @@ SwapManager::RecordInsert(Buffer &buf, int row, int col, std::string_view text)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
if (journals_[&buf].suspended)
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end() || it->second.suspended)
|
||||
return;
|
||||
}
|
||||
Pending p;
|
||||
p.buf = &buf;
|
||||
p.type = SwapRecType::INS;
|
||||
// payload: varint row, varint col, varint len, bytes
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(std::max(0, row)));
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(std::max(0, col)));
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(text.size()));
|
||||
// payload v1: [encver u8=1][row u32][col u32][nbytes u32][bytes]
|
||||
if (text.size() > 0xFFFFFFFFu)
|
||||
return;
|
||||
p.payload.push_back(1);
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, row)));
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, col)));
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(text.size()));
|
||||
p.payload.insert(p.payload.end(), reinterpret_cast<const std::uint8_t *>(text.data()),
|
||||
reinterpret_cast<const std::uint8_t *>(text.data()) + text.size());
|
||||
enqueue(std::move(p));
|
||||
@@ -309,15 +518,20 @@ SwapManager::RecordDelete(Buffer &buf, int row, int col, std::size_t len)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
if (journals_[&buf].suspended)
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end() || it->second.suspended)
|
||||
return;
|
||||
}
|
||||
if (len > 0xFFFFFFFFu)
|
||||
return;
|
||||
Pending p;
|
||||
p.buf = &buf;
|
||||
p.type = SwapRecType::DEL;
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(std::max(0, row)));
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(std::max(0, col)));
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(len));
|
||||
// payload v1: [encver u8=1][row u32][col u32][len u32]
|
||||
p.payload.push_back(1);
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, row)));
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, col)));
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(len));
|
||||
enqueue(std::move(p));
|
||||
}
|
||||
|
||||
@@ -327,14 +541,17 @@ SwapManager::RecordSplit(Buffer &buf, int row, int col)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
if (journals_[&buf].suspended)
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end() || it->second.suspended)
|
||||
return;
|
||||
}
|
||||
Pending p;
|
||||
p.buf = &buf;
|
||||
p.type = SwapRecType::SPLIT;
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(std::max(0, row)));
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(std::max(0, col)));
|
||||
// payload v1: [encver u8=1][row u32][col u32]
|
||||
p.payload.push_back(1);
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, row)));
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, col)));
|
||||
enqueue(std::move(p));
|
||||
}
|
||||
|
||||
@@ -344,13 +561,16 @@ SwapManager::RecordJoin(Buffer &buf, int row)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
if (journals_[&buf].suspended)
|
||||
auto it = journals_.find(&buf);
|
||||
if (it == journals_.end() || it->second.suspended)
|
||||
return;
|
||||
}
|
||||
Pending p;
|
||||
p.buf = &buf;
|
||||
p.type = SwapRecType::JOIN;
|
||||
put_varu64(p.payload, static_cast<std::uint64_t>(std::max(0, row)));
|
||||
// payload v1: [encver u8=1][row u32]
|
||||
p.payload.push_back(1);
|
||||
put_le32(p.payload, static_cast<std::uint32_t>(std::max(0, row)));
|
||||
enqueue(std::move(p));
|
||||
}
|
||||
|
||||
@@ -358,59 +578,91 @@ SwapManager::RecordJoin(Buffer &buf, int row)
|
||||
void
|
||||
SwapManager::writer_loop()
|
||||
{
|
||||
while (running_.load()) {
|
||||
for (;;) {
|
||||
std::vector<Pending> batch;
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(mtx_);
|
||||
if (queue_.empty()) {
|
||||
if (!running_.load())
|
||||
break;
|
||||
cv_.wait_for(lk, std::chrono::milliseconds(cfg_.flush_interval_ms));
|
||||
}
|
||||
if (!queue_.empty()) {
|
||||
batch.swap(queue_);
|
||||
inflight_ += batch.size();
|
||||
}
|
||||
}
|
||||
if (batch.empty())
|
||||
continue;
|
||||
|
||||
// Group by buffer path to minimize fsyncs
|
||||
for (const Pending &p: batch) {
|
||||
process_one(p);
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
if (p.seq > last_processed_)
|
||||
last_processed_ = p.seq;
|
||||
if (inflight_ > 0)
|
||||
--inflight_;
|
||||
}
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
// Throttled fsync: best-effort
|
||||
// Iterate unique contexts and fsync if needed
|
||||
// For stage 1, fsync all once per interval
|
||||
// Throttled fsync: best-effort (grouped)
|
||||
std::vector<int> to_sync;
|
||||
std::uint64_t now = now_ns();
|
||||
for (auto &kv: journals_) {
|
||||
JournalCtx &ctx = kv.second;
|
||||
if (ctx.fd >= 0) {
|
||||
if (ctx.last_fsync_ns == 0 || (now - ctx.last_fsync_ns) / 1000000ULL >= cfg_.
|
||||
fsync_interval_ms) {
|
||||
::fsync(ctx.fd);
|
||||
ctx.last_fsync_ns = now;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
for (auto &kv: journals_) {
|
||||
JournalCtx &ctx = kv.second;
|
||||
if (ctx.fd >= 0) {
|
||||
if (ctx.last_fsync_ns == 0 || (now - ctx.last_fsync_ns) / 1000000ULL >=
|
||||
cfg_.fsync_interval_ms) {
|
||||
ctx.last_fsync_ns = now;
|
||||
to_sync.push_back(ctx.fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int fd: to_sync) {
|
||||
(void) ::fsync(fd);
|
||||
}
|
||||
}
|
||||
// Wake any waiters.
|
||||
cv_.notify_all();
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
SwapManager::process_one(const Pending &p)
|
||||
{
|
||||
if (!p.buf)
|
||||
return;
|
||||
Buffer &buf = *p.buf;
|
||||
// Resolve context by path derived from buffer
|
||||
std::string path = ComputeSidecarPath(buf);
|
||||
// Get or create context keyed by this buffer pointer (stage 1 simplification)
|
||||
JournalCtx &ctx = journals_[p.buf];
|
||||
if (ctx.path.empty())
|
||||
ctx.path = path;
|
||||
if (!open_ctx(ctx))
|
||||
|
||||
JournalCtx *ctxp = nullptr;
|
||||
std::string path;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(mtx_);
|
||||
auto it = journals_.find(p.buf);
|
||||
if (it == journals_.end())
|
||||
return;
|
||||
if (it->second.suspended)
|
||||
return;
|
||||
if (it->second.path.empty())
|
||||
it->second.path = ComputeSidecarPath(buf);
|
||||
path = it->second.path;
|
||||
ctxp = &it->second;
|
||||
}
|
||||
if (!ctxp)
|
||||
return;
|
||||
if (!open_ctx(*ctxp, path))
|
||||
return;
|
||||
if (p.payload.size() > 0xFFFFFFu)
|
||||
return;
|
||||
|
||||
// Build record: [type u8][len u24][payload][crc32 u32]
|
||||
std::uint8_t len3[3];
|
||||
put_u24(len3, static_cast<std::uint32_t>(p.payload.size()));
|
||||
put_u24_le(len3, static_cast<std::uint32_t>(p.payload.size()));
|
||||
|
||||
std::uint8_t head[4];
|
||||
head[0] = static_cast<std::uint8_t>(p.type);
|
||||
@@ -422,13 +674,170 @@ SwapManager::process_one(const Pending &p)
|
||||
c = crc32(head, sizeof(head), c);
|
||||
if (!p.payload.empty())
|
||||
c = crc32(p.payload.data(), p.payload.size(), c);
|
||||
std::uint8_t crcbytes[4];
|
||||
crcbytes[0] = static_cast<std::uint8_t>(c & 0xFFu);
|
||||
crcbytes[1] = static_cast<std::uint8_t>((c >> 8) & 0xFFu);
|
||||
crcbytes[2] = static_cast<std::uint8_t>((c >> 16) & 0xFFu);
|
||||
crcbytes[3] = static_cast<std::uint8_t>((c >> 24) & 0xFFu);
|
||||
|
||||
// Write (handle partial writes and check results)
|
||||
bool ok = write_full(ctx.fd, head, sizeof(head));
|
||||
if (ok && !p.payload.empty())
|
||||
ok = write_full(ctx.fd, p.payload.data(), p.payload.size());
|
||||
if (ok)
|
||||
ok = write_full(ctx.fd, &c, sizeof(c));
|
||||
(void) ok; // stage 1: best-effort; future work could mark ctx error state
|
||||
// Write (handle partial writes and check results)
|
||||
bool ok = write_full(ctxp->fd, head, sizeof(head));
|
||||
if (ok && !p.payload.empty())
|
||||
ok = write_full(ctxp->fd, p.payload.data(), p.payload.size());
|
||||
if (ok)
|
||||
ok = write_full(ctxp->fd, crcbytes, sizeof(crcbytes));
|
||||
(void) ok; // stage 1: best-effort; future work could mark ctx error state
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
read_exact(std::ifstream &in, void *dst, std::size_t n)
|
||||
{
|
||||
in.read(static_cast<char *>(dst), static_cast<std::streamsize>(n));
|
||||
return in.good() && static_cast<std::size_t>(in.gcount()) == n;
|
||||
}
|
||||
|
||||
|
||||
static std::uint32_t
|
||||
read_le32(const std::uint8_t b[4])
|
||||
{
|
||||
return (std::uint32_t) b[0] | ((std::uint32_t) b[1] << 8) | ((std::uint32_t) b[2] << 16) | (
|
||||
(std::uint32_t) b[3] << 24);
|
||||
}
|
||||
|
||||
|
||||
static bool
|
||||
parse_u32_le(const std::vector<std::uint8_t> &p, std::size_t &off, std::uint32_t &out)
|
||||
{
|
||||
if (off + 4 > p.size())
|
||||
return false;
|
||||
out = (std::uint32_t) p[off] | ((std::uint32_t) p[off + 1] << 8) | ((std::uint32_t) p[off + 2] << 16) |
|
||||
((std::uint32_t) p[off + 3] << 24);
|
||||
off += 4;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
SwapManager::ReplayFile(Buffer &buf, const std::string &swap_path, std::string &err)
|
||||
{
|
||||
err.clear();
|
||||
std::ifstream in(swap_path, std::ios::binary);
|
||||
if (!in) {
|
||||
err = "Failed to open swap file for replay: " + swap_path;
|
||||
return false;
|
||||
}
|
||||
|
||||
std::uint8_t hdr[64];
|
||||
if (!read_exact(in, hdr, sizeof(hdr))) {
|
||||
err = "Swap file truncated (header): " + swap_path;
|
||||
return false;
|
||||
}
|
||||
if (std::memcmp(hdr, MAGIC, 8) != 0) {
|
||||
err = "Swap file has bad magic: " + swap_path;
|
||||
return false;
|
||||
}
|
||||
const std::uint32_t ver = read_le32(hdr + 8);
|
||||
if (ver != VERSION) {
|
||||
err = "Unsupported swap version: " + std::to_string(ver);
|
||||
return false;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
std::uint8_t head[4];
|
||||
in.read(reinterpret_cast<char *>(head), sizeof(head));
|
||||
const std::size_t got_head = static_cast<std::size_t>(in.gcount());
|
||||
if (got_head == 0 && in.eof()) {
|
||||
return true; // clean EOF
|
||||
}
|
||||
if (got_head != sizeof(head)) {
|
||||
err = "Swap file truncated (record header): " + swap_path;
|
||||
return false;
|
||||
}
|
||||
|
||||
const SwapRecType type = static_cast<SwapRecType>(head[0]);
|
||||
const std::size_t len = (std::size_t) head[1] | ((std::size_t) head[2] << 8) | (
|
||||
(std::size_t) head[3] << 16);
|
||||
std::vector<std::uint8_t> payload;
|
||||
payload.resize(len);
|
||||
if (len > 0 && !read_exact(in, payload.data(), len)) {
|
||||
err = "Swap file truncated (payload): " + swap_path;
|
||||
return false;
|
||||
}
|
||||
std::uint8_t crcbytes[4];
|
||||
if (!read_exact(in, crcbytes, sizeof(crcbytes))) {
|
||||
err = "Swap file truncated (crc): " + swap_path;
|
||||
return false;
|
||||
}
|
||||
const std::uint32_t want_crc = read_le32(crcbytes);
|
||||
std::uint32_t got_crc = 0;
|
||||
got_crc = crc32(head, sizeof(head), got_crc);
|
||||
if (!payload.empty())
|
||||
got_crc = crc32(payload.data(), payload.size(), got_crc);
|
||||
if (got_crc != want_crc) {
|
||||
err = "Swap file CRC mismatch: " + swap_path;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Apply record
|
||||
std::size_t off = 0;
|
||||
if (payload.empty()) {
|
||||
err = "Swap record missing payload";
|
||||
return false;
|
||||
}
|
||||
const std::uint8_t encver = payload[off++];
|
||||
if (encver != 1) {
|
||||
err = "Unsupported swap payload encoding";
|
||||
return false;
|
||||
}
|
||||
switch (type) {
|
||||
case SwapRecType::INS: {
|
||||
std::uint32_t row = 0, col = 0, nbytes = 0;
|
||||
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col) || !parse_u32_le(
|
||||
payload, off, nbytes)) {
|
||||
err = "Malformed INS payload";
|
||||
return false;
|
||||
}
|
||||
if (off + nbytes > payload.size()) {
|
||||
err = "Truncated INS payload bytes";
|
||||
return false;
|
||||
}
|
||||
buf.insert_text((int) row, (int) col,
|
||||
std::string_view(reinterpret_cast<const char *>(payload.data() + off), nbytes));
|
||||
break;
|
||||
}
|
||||
case SwapRecType::DEL: {
|
||||
std::uint32_t row = 0, col = 0, dlen = 0;
|
||||
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col) || !parse_u32_le(
|
||||
payload, off, dlen)) {
|
||||
err = "Malformed DEL payload";
|
||||
return false;
|
||||
}
|
||||
buf.delete_text((int) row, (int) col, (std::size_t) dlen);
|
||||
break;
|
||||
}
|
||||
case SwapRecType::SPLIT: {
|
||||
std::uint32_t row = 0, col = 0;
|
||||
if (!parse_u32_le(payload, off, row) || !parse_u32_le(payload, off, col)) {
|
||||
err = "Malformed SPLIT payload";
|
||||
return false;
|
||||
}
|
||||
buf.split_line((int) row, (int) col);
|
||||
break;
|
||||
}
|
||||
case SwapRecType::JOIN: {
|
||||
std::uint32_t row = 0;
|
||||
if (!parse_u32_le(payload, off, row)) {
|
||||
err = "Malformed JOIN payload";
|
||||
return false;
|
||||
}
|
||||
buf.join_lines((int) row);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
// Ignore unknown types for forward-compat in stage 1
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} // namespace kte
|
||||
Reference in New Issue
Block a user