using System.IO.Pipes; using CommIpc; static (string? pipeName, int? childId) ParseArgs(string[] args) { string? pipe = null; int? id = null; for (int i = 0; i < args.Length; i++) { switch (args[i]) { case "--pipe" when i + 1 < args.Length: pipe = args[++i]; break; case "--id" when i + 1 < args.Length: if (int.TryParse(args[++i], out int parsed)) { id = parsed; } break; } } return (pipe, id); } var (pipeName, childId) = ParseArgs(args); if (string.IsNullOrWhiteSpace(pipeName) || childId is null) { Console.Error.WriteLine("Usage: ChildWorker --pipe --id "); return 2; } using var appCts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; appCts.Cancel(); }; using var pipe = new NamedPipeClientStream(serverName: ".", pipeName: pipeName, direction: PipeDirection.InOut, options: PipeOptions.Asynchronous); await pipe.ConnectAsync(appCts.Token); var writeLock = new SemaphoreSlim(1, 1); async Task SendAsync(IpcFrame frame, CancellationToken ct) { await writeLock.WaitAsync(ct); try { await IpcProtocol.WriteFrameAsync(pipe, frame, ct); } finally { writeLock.Release(); } } await SendAsync( new IpcFrame(Kind: IpcKinds.Hello, Payload: IpcProtocol.ToJsonElement(new { childId = childId.Value, pid = Environment.ProcessId })), appCts.Token ); CancellationTokenSource? workCts = null; Task? workTask = null; string? currentWorkCorrelationId = null; async Task CancelWorkAsync() { try { workCts?.Cancel(); if (workTask is not null) { await workTask; } } catch (OperationCanceledException) { // expected } finally { workCts?.Dispose(); workCts = null; workTask = null; currentWorkCorrelationId = null; } } async Task StartWorkAsync(string correlationId, int steps, int delayMs, CancellationToken ct) { await CancelWorkAsync(); currentWorkCorrelationId = correlationId; workCts = CancellationTokenSource.CreateLinkedTokenSource(ct); var wct = workCts.Token; workTask = Task.Run( async () => { await SendAsync( new IpcFrame( Kind: IpcKinds.Log, CorrelationId: correlationId, Payload: IpcProtocol.ToJsonElement(new { level = "info", message = $"Child {childId}: work started ({steps} steps)." }) ), wct ); for (int i = 1; i <= steps; i++) { wct.ThrowIfCancellationRequested(); await Task.Delay(delayMs, wct); double percent = (i * 100.0) / steps; await SendAsync( new IpcFrame( Kind: IpcKinds.Progress, CorrelationId: correlationId, Payload: IpcProtocol.ToJsonElement( new { step = i, total = steps, percent, } ) ), wct ); if (i % Math.Max(1, steps / 5) == 0) { await SendAsync( new IpcFrame( Kind: IpcKinds.Log, CorrelationId: correlationId, Payload: IpcProtocol.ToJsonElement(new { level = "debug", message = $"Child {childId}: reached step {i}/{steps}." }) ), wct ); } } await SendAsync( new IpcFrame( Kind: IpcKinds.Result, CorrelationId: correlationId, Payload: IpcProtocol.ToJsonElement(new { message = $"Child {childId}: work finished." }) ), wct ); }, wct ); } try { while (!appCts.IsCancellationRequested) { IpcFrame? frame = await IpcProtocol.ReadFrameAsync(pipe, cancellationToken: appCts.Token); if (frame is null) { break; // parent disconnected } switch (frame.Kind) { case IpcKinds.Ping: await SendAsync( new IpcFrame(Kind: IpcKinds.Pong, CorrelationId: frame.CorrelationId, Payload: IpcProtocol.ToJsonElement(new { childId = childId.Value })), appCts.Token ); break; case IpcKinds.StartWork: { var payload = IpcProtocol.FromJsonElement(frame.Payload) ?? new StartWorkPayload(); string corr = frame.CorrelationId ?? Guid.NewGuid().ToString("N"); await StartWorkAsync(corr, payload.Steps, payload.DelayMs, appCts.Token); break; } case IpcKinds.CancelWork: await CancelWorkAsync(); await SendAsync( new IpcFrame( Kind: IpcKinds.Log, CorrelationId: frame.CorrelationId ?? currentWorkCorrelationId, Payload: IpcProtocol.ToJsonElement(new { level = "info", message = $"Child {childId}: work cancelled." }) ), appCts.Token ); break; default: await SendAsync( new IpcFrame( Kind: IpcKinds.Error, CorrelationId: frame.CorrelationId, Payload: IpcProtocol.ToJsonElement(new { message = $"Unknown command kind '{frame.Kind}'." }) ), appCts.Token ); break; } } } catch (OperationCanceledException) { // shutting down } finally { await CancelWorkAsync(); } return 0; file sealed record StartWorkPayload(int Steps = 25, int DelayMs = 150);