Files
CommTester/ChildWorkerNode/child-worker.js

294 lines
10 KiB
JavaScript

#!/usr/bin/env node
/**
* Node.js implementation of the ChildWorker for CommTester.
*
* Protocol: 4-byte little-endian length prefix + UTF-8 JSON payload
*
* Usage: node child-worker.js --pipe <pipeName> --id <childId>
*/
const net = require('net');
const path = require('path');
// ─────────────────────────────────────────────────────────────────────────────
// Argument Parsing
// ─────────────────────────────────────────────────────────────────────────────
function parseArgs(args) {
let pipeName = null;
let childId = null;
for (let i = 0; i < args.length; i++) {
if (args[i] === '--pipe' && i + 1 < args.length) {
pipeName = args[++i];
} else if (args[i] === '--id' && i + 1 < args.length) {
childId = parseInt(args[++i], 10);
}
}
return { pipeName, childId };
}
const { pipeName, childId } = parseArgs(process.argv.slice(2));
if (!pipeName || childId === null || isNaN(childId)) {
console.error('Usage: node child-worker.js --pipe <pipeName> --id <childId>');
process.exit(2);
}
// ─────────────────────────────────────────────────────────────────────────────
// IPC Protocol Constants (matching IpcKinds.cs)
// ─────────────────────────────────────────────────────────────────────────────
const IpcKinds = {
Hello: 'hello',
Ping: 'ping',
Pong: 'pong',
StartWork: 'startWork',
CancelWork: 'cancelWork',
Log: 'log',
Progress: 'progress',
Result: 'result',
Error: 'error',
};
// ─────────────────────────────────────────────────────────────────────────────
// Frame Serialization
// ─────────────────────────────────────────────────────────────────────────────
/**
* Writes a frame to the socket with 4-byte little-endian length prefix.
* @param {net.Socket} socket
* @param {object} frame
*/
function writeFrame(socket, frame) {
// Add timestamp if not present
if (!frame.timestamp) {
frame.timestamp = new Date().toISOString();
}
const json = Buffer.from(JSON.stringify(frame), 'utf8');
const header = Buffer.alloc(4);
header.writeUInt32LE(json.length, 0);
socket.write(header);
socket.write(json);
}
/**
* Creates a frame reader that emits complete frames.
* @returns {{ push: (chunk: Buffer) => void, onFrame: (callback: (frame: object) => void) => void }}
*/
function createFrameReader() {
let buffer = Buffer.alloc(0);
let frameCallback = null;
return {
push(chunk) {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= 4) {
const length = buffer.readUInt32LE(0);
if (length <= 0 || length > 4 * 1024 * 1024) {
throw new Error(`Invalid frame length: ${length}`);
}
if (buffer.length < 4 + length) {
break; // Wait for more data
}
const jsonBytes = buffer.subarray(4, 4 + length);
buffer = buffer.subarray(4 + length);
const frame = JSON.parse(jsonBytes.toString('utf8'));
if (frameCallback) {
frameCallback(frame);
}
}
},
onFrame(callback) {
frameCallback = callback;
}
};
}
// ─────────────────────────────────────────────────────────────────────────────
// Work Task Management
// ─────────────────────────────────────────────────────────────────────────────
let workAbortController = null;
let currentWorkCorrelationId = null;
async function cancelWork() {
if (workAbortController) {
workAbortController.abort();
workAbortController = null;
currentWorkCorrelationId = null;
}
}
async function startWork(socket, correlationId, steps, delayMs) {
await cancelWork();
currentWorkCorrelationId = correlationId;
workAbortController = new AbortController();
const signal = workAbortController.signal;
// Run work in the background
(async () => {
try {
writeFrame(socket, {
kind: IpcKinds.Log,
correlationId,
payload: { level: 'info', message: `Child ${childId}: work started (${steps} steps).` }
});
for (let i = 1; i <= steps; i++) {
if (signal.aborted) {
return;
}
await sleep(delayMs, signal);
const percent = (i * 100.0) / steps;
writeFrame(socket, {
kind: IpcKinds.Progress,
correlationId,
payload: { step: i, total: steps, percent }
});
// Log every ~20% of progress
if (i % Math.max(1, Math.floor(steps / 5)) === 0) {
writeFrame(socket, {
kind: IpcKinds.Log,
correlationId,
payload: { level: 'debug', message: `Child ${childId}: reached step ${i}/${steps}.` }
});
}
}
writeFrame(socket, {
kind: IpcKinds.Result,
correlationId,
payload: { message: `Child ${childId}: work finished.` }
});
} catch (err) {
if (err.name !== 'AbortError') {
writeFrame(socket, {
kind: IpcKinds.Error,
correlationId,
payload: { message: `Child ${childId}: work failed - ${err.message}` }
});
}
} finally {
if (currentWorkCorrelationId === correlationId) {
workAbortController = null;
currentWorkCorrelationId = null;
}
}
})();
}
function sleep(ms, signal) {
return new Promise((resolve, reject) => {
const timeout = setTimeout(resolve, ms);
if (signal) {
signal.addEventListener('abort', () => {
clearTimeout(timeout);
reject(new DOMException('Aborted', 'AbortError'));
}, { once: true });
}
});
}
// ─────────────────────────────────────────────────────────────────────────────
// Main
// ─────────────────────────────────────────────────────────────────────────────
// .NET uses different pipe paths per platform:
// - Windows: \\.\pipe\{pipeName}
// - Unix/macOS: /tmp/CoreFxPipe_{pipeName}
const pipePath = process.platform === 'win32'
? `\\\\.\\pipe\\${pipeName}`
: `/tmp/CoreFxPipe_${pipeName}`;
const socket = net.createConnection(pipePath, () => {
// Connection established - send hello
writeFrame(socket, {
kind: IpcKinds.Hello,
payload: { childId, pid: process.pid }
});
});
const frameReader = createFrameReader();
frameReader.onFrame((frame) => {
switch (frame.kind) {
case IpcKinds.Ping:
writeFrame(socket, {
kind: IpcKinds.Pong,
correlationId: frame.correlationId,
payload: { childId }
});
break;
case IpcKinds.StartWork: {
const payload = frame.payload || {};
const steps = payload.steps || 25;
const delayMs = payload.delayMs || 150;
const corr = frame.correlationId || crypto.randomUUID().replace(/-/g, '');
startWork(socket, corr, steps, delayMs);
break;
}
case IpcKinds.CancelWork:
cancelWork().then(() => {
writeFrame(socket, {
kind: IpcKinds.Log,
correlationId: frame.correlationId || currentWorkCorrelationId,
payload: { level: 'info', message: `Child ${childId}: work cancelled.` }
});
});
break;
default:
writeFrame(socket, {
kind: IpcKinds.Error,
correlationId: frame.correlationId,
payload: { message: `Unknown command kind '${frame.kind}'.` }
});
break;
}
});
socket.on('data', (chunk) => {
try {
frameReader.push(chunk);
} catch (err) {
console.error('Frame read error:', err.message);
socket.destroy();
}
});
socket.on('error', (err) => {
console.error('Socket error:', err.message);
process.exit(1);
});
socket.on('close', () => {
cancelWork();
process.exit(0);
});
// Handle graceful shutdown
process.on('SIGINT', () => {
cancelWork();
socket.end();
});
process.on('SIGTERM', () => {
cancelWork();
socket.end();
});