From f7b038faf9e51f22b9add7b285d6c088d489fa9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Holger=20B=C3=B6rchers?= Date: Tue, 24 Feb 2026 20:21:21 +0100 Subject: [PATCH] feat: Add Node.js worker implementation and integrate worker type selection in UI --- ChildWorkerNode/child-worker.js | 290 ++++++++++++++++++++++++++++++++ ChildWorkerNode/package.json | 18 ++ ParentAvalonia/MainViewModel.cs | 70 +++++--- ParentAvalonia/MainWindow.axaml | 3 + 4 files changed, 361 insertions(+), 20 deletions(-) create mode 100644 ChildWorkerNode/child-worker.js create mode 100644 ChildWorkerNode/package.json diff --git a/ChildWorkerNode/child-worker.js b/ChildWorkerNode/child-worker.js new file mode 100644 index 0000000..e65bf66 --- /dev/null +++ b/ChildWorkerNode/child-worker.js @@ -0,0 +1,290 @@ +#!/usr/bin/env node +/** + * Node.js implementation of the ChildWorker for CommTester. + * + * Protocol: 4-byte little-endian length prefix + UTF-8 JSON payload + * + * Usage: node child-worker.js --pipe --id + */ + +const net = require('net'); +const path = require('path'); + +// ───────────────────────────────────────────────────────────────────────────── +// Argument Parsing +// ───────────────────────────────────────────────────────────────────────────── + +function parseArgs(args) { + let pipeName = null; + let childId = null; + + for (let i = 0; i < args.length; i++) { + if (args[i] === '--pipe' && i + 1 < args.length) { + pipeName = args[++i]; + } else if (args[i] === '--id' && i + 1 < args.length) { + childId = parseInt(args[++i], 10); + } + } + + return { pipeName, childId }; +} + +const { pipeName, childId } = parseArgs(process.argv.slice(2)); + +if (!pipeName || childId === null || isNaN(childId)) { + console.error('Usage: node child-worker.js --pipe --id '); + process.exit(2); +} + +// ───────────────────────────────────────────────────────────────────────────── +// IPC Protocol Constants (matching IpcKinds.cs) +// ───────────────────────────────────────────────────────────────────────────── + +const IpcKinds = { + Hello: 'hello', + Ping: 'ping', + Pong: 'pong', + StartWork: 'startWork', + CancelWork: 'cancelWork', + Log: 'log', + Progress: 'progress', + Result: 'result', + Error: 'error', +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Frame Serialization +// ───────────────────────────────────────────────────────────────────────────── + +/** + * Writes a frame to the socket with 4-byte little-endian length prefix. + * @param {net.Socket} socket + * @param {object} frame + */ +function writeFrame(socket, frame) { + // Add timestamp if not present + if (!frame.timestamp) { + frame.timestamp = new Date().toISOString(); + } + + const json = Buffer.from(JSON.stringify(frame), 'utf8'); + const header = Buffer.alloc(4); + header.writeUInt32LE(json.length, 0); + + socket.write(header); + socket.write(json); +} + +/** + * Creates a frame reader that emits complete frames. + * @returns {{ push: (chunk: Buffer) => void, onFrame: (callback: (frame: object) => void) => void }} + */ +function createFrameReader() { + let buffer = Buffer.alloc(0); + let frameCallback = null; + + return { + push(chunk) { + buffer = Buffer.concat([buffer, chunk]); + + while (buffer.length >= 4) { + const length = buffer.readUInt32LE(0); + + if (length <= 0 || length > 4 * 1024 * 1024) { + throw new Error(`Invalid frame length: ${length}`); + } + + if (buffer.length < 4 + length) { + break; // Wait for more data + } + + const jsonBytes = buffer.subarray(4, 4 + length); + buffer = buffer.subarray(4 + length); + + const frame = JSON.parse(jsonBytes.toString('utf8')); + if (frameCallback) { + frameCallback(frame); + } + } + }, + onFrame(callback) { + frameCallback = callback; + } + }; +} + +// ───────────────────────────────────────────────────────────────────────────── +// Work Task Management +// ───────────────────────────────────────────────────────────────────────────── + +let workAbortController = null; +let currentWorkCorrelationId = null; + +async function cancelWork() { + if (workAbortController) { + workAbortController.abort(); + workAbortController = null; + currentWorkCorrelationId = null; + } +} + +async function startWork(socket, correlationId, steps, delayMs) { + await cancelWork(); + + currentWorkCorrelationId = correlationId; + workAbortController = new AbortController(); + const signal = workAbortController.signal; + + // Run work in the background + (async () => { + try { + writeFrame(socket, { + kind: IpcKinds.Log, + correlationId, + payload: { level: 'info', message: `Child ${childId}: work started (${steps} steps).` } + }); + + for (let i = 1; i <= steps; i++) { + if (signal.aborted) { + return; + } + + await sleep(delayMs, signal); + + const percent = (i * 100.0) / steps; + writeFrame(socket, { + kind: IpcKinds.Progress, + correlationId, + payload: { step: i, total: steps, percent } + }); + + // Log every ~20% of progress + if (i % Math.max(1, Math.floor(steps / 5)) === 0) { + writeFrame(socket, { + kind: IpcKinds.Log, + correlationId, + payload: { level: 'debug', message: `Child ${childId}: reached step ${i}/${steps}.` } + }); + } + } + + writeFrame(socket, { + kind: IpcKinds.Result, + correlationId, + payload: { message: `Child ${childId}: work finished.` } + }); + } catch (err) { + if (err.name !== 'AbortError') { + writeFrame(socket, { + kind: IpcKinds.Error, + correlationId, + payload: { message: `Child ${childId}: work failed - ${err.message}` } + }); + } + } finally { + if (currentWorkCorrelationId === correlationId) { + workAbortController = null; + currentWorkCorrelationId = null; + } + } + })(); +} + +function sleep(ms, signal) { + return new Promise((resolve, reject) => { + const timeout = setTimeout(resolve, ms); + if (signal) { + signal.addEventListener('abort', () => { + clearTimeout(timeout); + reject(new DOMException('Aborted', 'AbortError')); + }, { once: true }); + } + }); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Main +// ───────────────────────────────────────────────────────────────────────────── + +const pipePath = process.platform === 'win32' + ? `\\\\.\\pipe\\${pipeName}` + : `/tmp/${pipeName}`; + +const socket = net.createConnection(pipePath, () => { + // Connection established - send hello + writeFrame(socket, { + kind: IpcKinds.Hello, + payload: { childId, pid: process.pid } + }); +}); + +const frameReader = createFrameReader(); + +frameReader.onFrame((frame) => { + switch (frame.kind) { + case IpcKinds.Ping: + writeFrame(socket, { + kind: IpcKinds.Pong, + correlationId: frame.correlationId, + payload: { childId } + }); + break; + + case IpcKinds.StartWork: { + const payload = frame.payload || {}; + const steps = payload.steps || 25; + const delayMs = payload.delayMs || 150; + const corr = frame.correlationId || crypto.randomUUID().replace(/-/g, ''); + startWork(socket, corr, steps, delayMs); + break; + } + + case IpcKinds.CancelWork: + cancelWork().then(() => { + writeFrame(socket, { + kind: IpcKinds.Log, + correlationId: frame.correlationId || currentWorkCorrelationId, + payload: { level: 'info', message: `Child ${childId}: work cancelled.` } + }); + }); + break; + + default: + writeFrame(socket, { + kind: IpcKinds.Error, + correlationId: frame.correlationId, + payload: { message: `Unknown command kind '${frame.kind}'.` } + }); + break; + } +}); + +socket.on('data', (chunk) => { + try { + frameReader.push(chunk); + } catch (err) { + console.error('Frame read error:', err.message); + socket.destroy(); + } +}); + +socket.on('error', (err) => { + console.error('Socket error:', err.message); + process.exit(1); +}); + +socket.on('close', () => { + cancelWork(); + process.exit(0); +}); + +// Handle graceful shutdown +process.on('SIGINT', () => { + cancelWork(); + socket.end(); +}); + +process.on('SIGTERM', () => { + cancelWork(); + socket.end(); +}); diff --git a/ChildWorkerNode/package.json b/ChildWorkerNode/package.json new file mode 100644 index 0000000..5f05173 --- /dev/null +++ b/ChildWorkerNode/package.json @@ -0,0 +1,18 @@ +{ + "name": "child-worker-node", + "version": "1.0.0", + "description": "Node.js implementation of CommTester ChildWorker", + "main": "child-worker.js", + "scripts": { + "start": "node child-worker.js" + }, + "keywords": [ + "ipc", + "named-pipe", + "child-worker" + ], + "license": "MIT", + "engines": { + "node": ">=18.0.0" + } +} diff --git a/ParentAvalonia/MainViewModel.cs b/ParentAvalonia/MainViewModel.cs index 57fe2ad..5300b5c 100644 --- a/ParentAvalonia/MainViewModel.cs +++ b/ParentAvalonia/MainViewModel.cs @@ -7,12 +7,27 @@ using CommIpc; namespace ParentAvalonia; +public enum WorkerType +{ + CSharp, + NodeJs, +} + public sealed class MainViewModel : NotifyBase, IAsyncDisposable { private ChildSession? _selectedChild; private bool _childrenStarted; + private WorkerType _selectedWorkerType = WorkerType.CSharp; private readonly CancellationTokenSource _shutdownCts = new(); + public WorkerType[] WorkerTypes { get; } = Enum.GetValues(); + + public WorkerType SelectedWorkerType + { + get => _selectedWorkerType; + set => SetField(ref _selectedWorkerType, value); + } + public ObservableCollection Children { get; } = new(); public ChildSession? SelectedChild @@ -110,36 +125,51 @@ public sealed class MainViewModel : NotifyBase, IAsyncDisposable _ = Task.Run(() => ReceiveLoopAsync(session), _shutdownCts.Token); } - private static Process StartChildProcess(int childId, string pipeName) + private Process StartChildProcess(int childId, string pipeName) { - // Cross-platform friendly: prefer running the built dll via 'dotnet'. string? solutionRoot = TryFindSolutionRoot(AppContext.BaseDirectory); if (solutionRoot is null) { throw new InvalidOperationException("Could not find solution root (CommTester.slnx)."); } - string debugDll = Path.Combine(solutionRoot, "ChildWorker", "bin", "Debug", "net10.0", "ChildWorker.dll"); - string releaseDll = Path.Combine(solutionRoot, "ChildWorker", "bin", "Release", "net10.0", "ChildWorker.dll"); - - string childDll = - File.Exists(debugDll) ? debugDll - : File.Exists(releaseDll) ? releaseDll - : string.Empty; - string fileName; string arguments; - if (!string.IsNullOrWhiteSpace(childDll)) + + switch (SelectedWorkerType) { - fileName = "dotnet"; - arguments = $"\"{childDll}\" --pipe \"{pipeName}\" --id {childId}"; - } - else - { - // Last-resort: run via project (slower but robust in fresh checkouts). - string csproj = Path.Combine(solutionRoot, "ChildWorker", "ChildWorker.csproj"); - fileName = "dotnet"; - arguments = $"run --project \"{csproj}\" -- --pipe \"{pipeName}\" --id {childId}"; + case WorkerType.NodeJs: + string nodeScript = Path.Combine(solutionRoot, "ChildWorkerNode", "child-worker.js"); + if (!File.Exists(nodeScript)) + { + throw new FileNotFoundException($"Node.js worker not found: {nodeScript}"); + } + fileName = "node"; + arguments = $"\"{nodeScript}\" --pipe \"{pipeName}\" --id {childId}"; + break; + + case WorkerType.CSharp: + default: + string debugDll = Path.Combine(solutionRoot, "ChildWorker", "bin", "Debug", "net10.0", "ChildWorker.dll"); + string releaseDll = Path.Combine(solutionRoot, "ChildWorker", "bin", "Release", "net10.0", "ChildWorker.dll"); + + string childDll = + File.Exists(debugDll) ? debugDll + : File.Exists(releaseDll) ? releaseDll + : string.Empty; + + if (!string.IsNullOrWhiteSpace(childDll)) + { + fileName = "dotnet"; + arguments = $"\"{childDll}\" --pipe \"{pipeName}\" --id {childId}"; + } + else + { + string csproj = Path.Combine(solutionRoot, "ChildWorker", "ChildWorker.csproj"); + fileName = "dotnet"; + arguments = $"run --project \"{csproj}\" -- --pipe \"{pipeName}\" --id {childId}"; + } + break; } var psi = new ProcessStartInfo diff --git a/ParentAvalonia/MainWindow.axaml b/ParentAvalonia/MainWindow.axaml index eae66dc..02c38f6 100644 --- a/ParentAvalonia/MainWindow.axaml +++ b/ParentAvalonia/MainWindow.axaml @@ -8,6 +8,9 @@ +