Files
CommTester/ChildWorker/Program.cs
Holger Börchers 894fbbfa5a Initial commit
2026-01-30 15:31:43 +01:00

195 lines
5.5 KiB
C#

using System.IO.Pipes;
using CommIpc;
static (string? pipeName, int? childId) ParseArgs(string[] args)
{
string? pipe = null;
int? id = null;
for (int i = 0; i < args.Length; i++)
{
switch (args[i])
{
case "--pipe" when i + 1 < args.Length:
pipe = args[++i];
break;
case "--id" when i + 1 < args.Length:
if (int.TryParse(args[++i], out int parsed))
{
id = parsed;
}
break;
}
}
return (pipe, id);
}
var (pipeName, childId) = ParseArgs(args);
if (string.IsNullOrWhiteSpace(pipeName) || childId is null)
{
Console.Error.WriteLine("Usage: ChildWorker --pipe <pipeName> --id <childId>");
return 2;
}
using var appCts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
appCts.Cancel();
};
using var pipe = new NamedPipeClientStream(
serverName: ".",
pipeName: pipeName,
direction: PipeDirection.InOut,
options: PipeOptions.Asynchronous);
await pipe.ConnectAsync(appCts.Token);
var writeLock = new SemaphoreSlim(1, 1);
async Task SendAsync(IpcFrame frame, CancellationToken ct)
{
await writeLock.WaitAsync(ct);
try
{
await IpcProtocol.WriteFrameAsync(pipe, frame, ct);
}
finally
{
writeLock.Release();
}
}
await SendAsync(
new IpcFrame(
Kind: IpcKinds.Hello,
Payload: IpcProtocol.ToJsonElement(new { childId = childId.Value, pid = Environment.ProcessId })),
appCts.Token);
CancellationTokenSource? workCts = null;
Task? workTask = null;
string? currentWorkCorrelationId = null;
async Task CancelWorkAsync()
{
try
{
workCts?.Cancel();
if (workTask is not null)
{
await workTask;
}
}
catch (OperationCanceledException)
{
// expected
}
finally
{
workCts?.Dispose();
workCts = null;
workTask = null;
currentWorkCorrelationId = null;
}
}
async Task StartWorkAsync(string correlationId, int steps, int delayMs, CancellationToken ct)
{
await CancelWorkAsync();
currentWorkCorrelationId = correlationId;
workCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var wct = workCts.Token;
workTask = Task.Run(async () =>
{
await SendAsync(new IpcFrame(
Kind: IpcKinds.Log,
CorrelationId: correlationId,
Payload: IpcProtocol.ToJsonElement(new { level = "info", message = $"Child {childId}: work started ({steps} steps)." })), wct);
for (int i = 1; i <= steps; i++)
{
wct.ThrowIfCancellationRequested();
await Task.Delay(delayMs, wct);
double percent = (i * 100.0) / steps;
await SendAsync(new IpcFrame(
Kind: IpcKinds.Progress,
CorrelationId: correlationId,
Payload: IpcProtocol.ToJsonElement(new { step = i, total = steps, percent })), wct);
if (i % Math.Max(1, steps / 5) == 0)
{
await SendAsync(new IpcFrame(
Kind: IpcKinds.Log,
CorrelationId: correlationId,
Payload: IpcProtocol.ToJsonElement(new { level = "debug", message = $"Child {childId}: reached step {i}/{steps}." })), wct);
}
}
await SendAsync(new IpcFrame(
Kind: IpcKinds.Result,
CorrelationId: correlationId,
Payload: IpcProtocol.ToJsonElement(new { message = $"Child {childId}: work finished." })), wct);
}, wct);
}
try
{
while (!appCts.IsCancellationRequested)
{
IpcFrame? frame = await IpcProtocol.ReadFrameAsync(pipe, cancellationToken: appCts.Token);
if (frame is null)
{
break; // parent disconnected
}
switch (frame.Kind)
{
case IpcKinds.Ping:
await SendAsync(new IpcFrame(
Kind: IpcKinds.Pong,
CorrelationId: frame.CorrelationId,
Payload: IpcProtocol.ToJsonElement(new { childId = childId.Value })), appCts.Token);
break;
case IpcKinds.StartWork:
{
var payload = IpcProtocol.FromJsonElement<StartWorkPayload>(frame.Payload) ?? new StartWorkPayload();
string corr = frame.CorrelationId ?? Guid.NewGuid().ToString("N");
await StartWorkAsync(corr, payload.Steps, payload.DelayMs, appCts.Token);
break;
}
case IpcKinds.CancelWork:
await CancelWorkAsync();
await SendAsync(new IpcFrame(
Kind: IpcKinds.Log,
CorrelationId: frame.CorrelationId ?? currentWorkCorrelationId,
Payload: IpcProtocol.ToJsonElement(new { level = "info", message = $"Child {childId}: work cancelled." })), appCts.Token);
break;
default:
await SendAsync(new IpcFrame(
Kind: IpcKinds.Error,
CorrelationId: frame.CorrelationId,
Payload: IpcProtocol.ToJsonElement(new { message = $"Unknown command kind '{frame.Kind}'." })), appCts.Token);
break;
}
}
}
catch (OperationCanceledException)
{
// shutting down
}
finally
{
await CancelWorkAsync();
}
return 0;
file sealed record StartWorkPayload(int Steps = 25, int DelayMs = 150);