Implement full run pipeline

- Phase 3: rsync with --itemize-changes captured, parsed into change list
- Phase 4: per-file zstd deltas written to DELTAS/tmp/N/files/
- Phase 5: manifest.json written, atomic rename tmp/N → N
- Phase 6: PEND promoted to PREV via rm+rename
- Dry-run prints all steps without executing
This commit is contained in:
2026-03-07 01:08:30 +00:00
parent 30b90193d7
commit 96e3024991

View File

@@ -1,16 +1,16 @@
/** /**
* run command — full backup run. * run command — full backup run.
*/ */
import { rm, mkdir } from 'fs/promises'; import { rm, mkdir, rename, writeFile } from 'fs/promises';
import { join } from 'path'; import { join } from 'path';
import { run as spawn } from '../spawn.js'; import { run as spawn, capture } from '../spawn.js';
import { parseItemize } from '../itemize.js';
import { getBackend } from '../backends/index.js'; import { getBackend } from '../backends/index.js';
import { readState, writeState, PHASES } from '../state.js'; import { readState, writeState, PHASES } from '../state.js';
export async function runCommand(config) { export async function runCommand(config) {
const { source, prev, pend, deltas, backend: backendName, dryRun } = config; const { source, prev, pend, deltas, backend: backendName, dryRun: dry } = config;
const backend = getBackend(backendName); const backend = getBackend(backendName);
const dry = dryRun;
if (dry) console.log('[dry-run] No changes will be made.\n'); if (dry) console.log('[dry-run] No changes will be made.\n');
@@ -20,7 +20,7 @@ export async function runCommand(config) {
console.log(`Starting run — seq ${seq} (last complete: ${state.last_complete})`); console.log(`Starting run — seq ${seq} (last complete: ${state.last_complete})`);
// TODO: detect and handle partially-committed previous run // TODO: detect and recover from partially-committed previous run
// ── Phase 1: Clear PEND ───────────────────────────────────── // ── Phase 1: Clear PEND ─────────────────────────────────────
await setPhase(deltas, state, PHASES.CLEARING_PEND, dry); await setPhase(deltas, state, PHASES.CLEARING_PEND, dry);
@@ -35,27 +35,113 @@ export async function runCommand(config) {
// ── Phase 2: rsync PREV → PEND (local seed) ───────────────── // ── Phase 2: rsync PREV → PEND (local seed) ─────────────────
await setPhase(deltas, state, PHASES.RSYNC_LOCAL, dry); await setPhase(deltas, state, PHASES.RSYNC_LOCAL, dry);
console.log('\n── rsync PREV → PEND (local seed) ──'); console.log('\n── rsync PREV → PEND (local seed) ──');
await spawn('rsync', ['-aP', trailingSlash(prev), pend], { dryRun: dry }); await spawn('rsync', ['-aP', trailingSlash(prev), trailingSlash(pend)], { dryRun: dry });
// ── Phase 3: rsync SOURCE → PEND (remote changes) ─────────── // ── Phase 3: rsync SOURCE → PEND, capture change list ───────
await setPhase(deltas, state, PHASES.RSYNC_REMOTE, dry); await setPhase(deltas, state, PHASES.RSYNC_REMOTE, dry);
console.log('\n── rsync SOURCE → PEND ──'); console.log('\n── rsync SOURCE → PEND ──');
await spawn('rsync', ['-aP', trailingSlash(source), pend], { dryRun: dry });
// ── Phase 4: Generate delta ────────────────────────────────── let changes = [];
if (!dry) {
const rsyncArgs = [
'-aP',
'--itemize-changes',
'--delete',
trailingSlash(source),
trailingSlash(pend),
];
const output = await capture('rsync', rsyncArgs);
changes = parseItemize(output);
console.log(` ${changes.length} file(s) changed`);
for (const c of changes) console.log(` [${c.status}] ${c.path}`);
} else {
console.log(`$ rsync -aP --itemize-changes --delete ${trailingSlash(source)} ${trailingSlash(pend)}`);
console.log('[dry-run] (change list will be determined at runtime)');
}
// ── Phase 4: Generate per-file deltas into DELTAS/tmp/N/ ────
await setPhase(deltas, state, PHASES.GENERATING, dry); await setPhase(deltas, state, PHASES.GENERATING, dry);
console.log('\n── Generate delta ──'); console.log('\n── Generate delta ──');
// TODO: walk PREV and PEND, diff per file, build manifest
// ── Phase 5: Commit delta ──────────────────────────────────── const tmpDir = join(deltas, 'tmp', String(seq));
const filesDir = join(tmpDir, 'files');
if (!dry) {
await mkdir(filesDir, { recursive: true });
} else {
console.log(`[dry-run] mkdir -p ${filesDir}`);
}
const manifestChanges = [];
for (const change of changes) {
const deltaFilename = change.path.replaceAll('/', '__') + backend.ext;
const outFile = join(filesDir, deltaFilename);
if (change.status === 'deleted') {
manifestChanges.push({ path: change.path, status: 'deleted' });
continue;
}
const prevFile = join(prev, change.path);
const newFile = join(pend, change.path);
console.log(` [${change.status}] ${change.path}`);
if (!dry) {
await backend.createDelta(
change.status === 'modified' ? prevFile : null,
newFile,
outFile,
);
} else {
console.log(`[dry-run] ${change.status === 'modified'
? `zstd --patch-from ${prevFile} ${newFile} -o ${outFile}`
: `zstd ${newFile} -o ${outFile}`}`);
}
manifestChanges.push({
path: change.path,
status: change.status,
delta: join('files', deltaFilename),
});
}
// ── Phase 5: Write manifest + atomic commit ──────────────────
await setPhase(deltas, state, PHASES.COMMITTING, dry); await setPhase(deltas, state, PHASES.COMMITTING, dry);
console.log('\n── Commit delta ──'); console.log('\n── Commit delta ──');
// TODO: atomic rename DELTAS/tmp/N → DELTAS/N
const manifest = {
seq,
timestamp: new Date().toISOString(),
prev_seq: state.last_complete,
backend: backendName,
changes: manifestChanges,
};
const seqDir = join(deltas, String(seq));
if (!dry) {
await writeFile(join(tmpDir, 'manifest.json'), JSON.stringify(manifest, null, 2) + '\n');
// Atomic rename: tmp/N → N
await rename(tmpDir, seqDir);
console.log(` Committed to ${seqDir}`);
} else {
console.log(`[dry-run] write manifest to ${tmpDir}/manifest.json`);
console.log(`[dry-run] rename ${tmpDir}${seqDir}`);
}
// ── Phase 6: Promote PEND → PREV ──────────────────────────── // ── Phase 6: Promote PEND → PREV ────────────────────────────
await setPhase(deltas, state, PHASES.PROMOTING, dry); await setPhase(deltas, state, PHASES.PROMOTING, dry);
console.log('\n── Promote PEND → PREV ──'); console.log('\n── Promote PEND → PREV ──');
// TODO: mv PEND PREV (swap)
if (!dry) {
await rm(prev, { recursive: true, force: true });
await rename(pend, prev);
console.log(` ${pend}${prev}`);
} else {
console.log(`[dry-run] rm -rf ${prev} && mv ${pend} ${prev}`);
}
// ── Done ───────────────────────────────────────────────────── // ── Done ─────────────────────────────────────────────────────
state.last_complete = seq; state.last_complete = seq;
@@ -63,7 +149,7 @@ export async function runCommand(config) {
state.phase = PHASES.IDLE; state.phase = PHASES.IDLE;
if (!dry) await writeState(deltas, state); if (!dry) await writeState(deltas, state);
console.log(`\nRun complete — seq ${seq} committed.`); console.log(`\nRun complete — seq ${seq} committed. ${manifestChanges.length} file(s) in delta.`);
} }
async function setPhase(deltas, state, phase, dry) { async function setPhase(deltas, state, phase, dry) {