#!/usr/bin/env node /** * Node.js implementation of the ChildWorker for CommTester. * * Protocol: 4-byte little-endian length prefix + UTF-8 JSON payload * * Usage: node child-worker.js --pipe --id */ const net = require('net'); const path = require('path'); // ───────────────────────────────────────────────────────────────────────────── // Argument Parsing // ───────────────────────────────────────────────────────────────────────────── function parseArgs(args) { let pipeName = null; let childId = null; for (let i = 0; i < args.length; i++) { if (args[i] === '--pipe' && i + 1 < args.length) { pipeName = args[++i]; } else if (args[i] === '--id' && i + 1 < args.length) { childId = parseInt(args[++i], 10); } } return { pipeName, childId }; } const { pipeName, childId } = parseArgs(process.argv.slice(2)); if (!pipeName || childId === null || isNaN(childId)) { console.error('Usage: node child-worker.js --pipe --id '); process.exit(2); } // ───────────────────────────────────────────────────────────────────────────── // IPC Protocol Constants (matching IpcKinds.cs) // ───────────────────────────────────────────────────────────────────────────── const IpcKinds = { Hello: 'hello', Ping: 'ping', Pong: 'pong', StartWork: 'startWork', CancelWork: 'cancelWork', Log: 'log', Progress: 'progress', Result: 'result', Error: 'error', }; // ───────────────────────────────────────────────────────────────────────────── // Frame Serialization // ───────────────────────────────────────────────────────────────────────────── /** * Writes a frame to the socket with 4-byte little-endian length prefix. * @param {net.Socket} socket * @param {object} frame */ function writeFrame(socket, frame) { // Add timestamp if not present if (!frame.timestamp) { frame.timestamp = new Date().toISOString(); } const json = Buffer.from(JSON.stringify(frame), 'utf8'); const header = Buffer.alloc(4); header.writeUInt32LE(json.length, 0); socket.write(header); socket.write(json); } /** * Creates a frame reader that emits complete frames. * @returns {{ push: (chunk: Buffer) => void, onFrame: (callback: (frame: object) => void) => void }} */ function createFrameReader() { let buffer = Buffer.alloc(0); let frameCallback = null; return { push(chunk) { buffer = Buffer.concat([buffer, chunk]); while (buffer.length >= 4) { const length = buffer.readUInt32LE(0); if (length <= 0 || length > 4 * 1024 * 1024) { throw new Error(`Invalid frame length: ${length}`); } if (buffer.length < 4 + length) { break; // Wait for more data } const jsonBytes = buffer.subarray(4, 4 + length); buffer = buffer.subarray(4 + length); const frame = JSON.parse(jsonBytes.toString('utf8')); if (frameCallback) { frameCallback(frame); } } }, onFrame(callback) { frameCallback = callback; } }; } // ───────────────────────────────────────────────────────────────────────────── // Work Task Management // ───────────────────────────────────────────────────────────────────────────── let workAbortController = null; let currentWorkCorrelationId = null; async function cancelWork() { if (workAbortController) { workAbortController.abort(); workAbortController = null; currentWorkCorrelationId = null; } } async function startWork(socket, correlationId, steps, delayMs) { await cancelWork(); currentWorkCorrelationId = correlationId; workAbortController = new AbortController(); const signal = workAbortController.signal; // Run work in the background (async () => { try { writeFrame(socket, { kind: IpcKinds.Log, correlationId, payload: { level: 'info', message: `Child ${childId}: work started (${steps} steps).` } }); for (let i = 1; i <= steps; i++) { if (signal.aborted) { return; } await sleep(delayMs, signal); const percent = (i * 100.0) / steps; writeFrame(socket, { kind: IpcKinds.Progress, correlationId, payload: { step: i, total: steps, percent } }); // Log every ~20% of progress if (i % Math.max(1, Math.floor(steps / 5)) === 0) { writeFrame(socket, { kind: IpcKinds.Log, correlationId, payload: { level: 'debug', message: `Child ${childId}: reached step ${i}/${steps}.` } }); } } writeFrame(socket, { kind: IpcKinds.Result, correlationId, payload: { message: `Child ${childId}: work finished.` } }); } catch (err) { if (err.name !== 'AbortError') { writeFrame(socket, { kind: IpcKinds.Error, correlationId, payload: { message: `Child ${childId}: work failed - ${err.message}` } }); } } finally { if (currentWorkCorrelationId === correlationId) { workAbortController = null; currentWorkCorrelationId = null; } } })(); } function sleep(ms, signal) { return new Promise((resolve, reject) => { const timeout = setTimeout(resolve, ms); if (signal) { signal.addEventListener('abort', () => { clearTimeout(timeout); reject(new DOMException('Aborted', 'AbortError')); }, { once: true }); } }); } // ───────────────────────────────────────────────────────────────────────────── // Main // ───────────────────────────────────────────────────────────────────────────── const pipePath = process.platform === 'win32' ? `\\\\.\\pipe\\${pipeName}` : `/tmp/${pipeName}`; const socket = net.createConnection(pipePath, () => { // Connection established - send hello writeFrame(socket, { kind: IpcKinds.Hello, payload: { childId, pid: process.pid } }); }); const frameReader = createFrameReader(); frameReader.onFrame((frame) => { switch (frame.kind) { case IpcKinds.Ping: writeFrame(socket, { kind: IpcKinds.Pong, correlationId: frame.correlationId, payload: { childId } }); break; case IpcKinds.StartWork: { const payload = frame.payload || {}; const steps = payload.steps || 25; const delayMs = payload.delayMs || 150; const corr = frame.correlationId || crypto.randomUUID().replace(/-/g, ''); startWork(socket, corr, steps, delayMs); break; } case IpcKinds.CancelWork: cancelWork().then(() => { writeFrame(socket, { kind: IpcKinds.Log, correlationId: frame.correlationId || currentWorkCorrelationId, payload: { level: 'info', message: `Child ${childId}: work cancelled.` } }); }); break; default: writeFrame(socket, { kind: IpcKinds.Error, correlationId: frame.correlationId, payload: { message: `Unknown command kind '${frame.kind}'.` } }); break; } }); socket.on('data', (chunk) => { try { frameReader.push(chunk); } catch (err) { console.error('Frame read error:', err.message); socket.destroy(); } }); socket.on('error', (err) => { console.error('Socket error:', err.message); process.exit(1); }); socket.on('close', () => { cancelWork(); process.exit(0); }); // Handle graceful shutdown process.on('SIGINT', () => { cancelWork(); socket.end(); }); process.on('SIGTERM', () => { cancelWork(); socket.end(); });