195 lines
5.5 KiB
C#
195 lines
5.5 KiB
C#
using System.IO.Pipes;
|
|
using CommIpc;
|
|
|
|
static (string? pipeName, int? childId) ParseArgs(string[] args)
|
|
{
|
|
string? pipe = null;
|
|
int? id = null;
|
|
|
|
for (int i = 0; i < args.Length; i++)
|
|
{
|
|
switch (args[i])
|
|
{
|
|
case "--pipe" when i + 1 < args.Length:
|
|
pipe = args[++i];
|
|
break;
|
|
case "--id" when i + 1 < args.Length:
|
|
if (int.TryParse(args[++i], out int parsed))
|
|
{
|
|
id = parsed;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
return (pipe, id);
|
|
}
|
|
|
|
var (pipeName, childId) = ParseArgs(args);
|
|
if (string.IsNullOrWhiteSpace(pipeName) || childId is null)
|
|
{
|
|
Console.Error.WriteLine("Usage: ChildWorker --pipe <pipeName> --id <childId>");
|
|
return 2;
|
|
}
|
|
|
|
using var appCts = new CancellationTokenSource();
|
|
Console.CancelKeyPress += (_, e) =>
|
|
{
|
|
e.Cancel = true;
|
|
appCts.Cancel();
|
|
};
|
|
|
|
using var pipe = new NamedPipeClientStream(
|
|
serverName: ".",
|
|
pipeName: pipeName,
|
|
direction: PipeDirection.InOut,
|
|
options: PipeOptions.Asynchronous);
|
|
|
|
await pipe.ConnectAsync(appCts.Token);
|
|
|
|
var writeLock = new SemaphoreSlim(1, 1);
|
|
async Task SendAsync(IpcFrame frame, CancellationToken ct)
|
|
{
|
|
await writeLock.WaitAsync(ct);
|
|
try
|
|
{
|
|
await IpcProtocol.WriteFrameAsync(pipe, frame, ct);
|
|
}
|
|
finally
|
|
{
|
|
writeLock.Release();
|
|
}
|
|
}
|
|
|
|
await SendAsync(
|
|
new IpcFrame(
|
|
Kind: IpcKinds.Hello,
|
|
Payload: IpcProtocol.ToJsonElement(new { childId = childId.Value, pid = Environment.ProcessId })),
|
|
appCts.Token);
|
|
|
|
CancellationTokenSource? workCts = null;
|
|
Task? workTask = null;
|
|
string? currentWorkCorrelationId = null;
|
|
|
|
async Task CancelWorkAsync()
|
|
{
|
|
try
|
|
{
|
|
workCts?.Cancel();
|
|
if (workTask is not null)
|
|
{
|
|
await workTask;
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// expected
|
|
}
|
|
finally
|
|
{
|
|
workCts?.Dispose();
|
|
workCts = null;
|
|
workTask = null;
|
|
currentWorkCorrelationId = null;
|
|
}
|
|
}
|
|
|
|
async Task StartWorkAsync(string correlationId, int steps, int delayMs, CancellationToken ct)
|
|
{
|
|
await CancelWorkAsync();
|
|
|
|
currentWorkCorrelationId = correlationId;
|
|
workCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
var wct = workCts.Token;
|
|
|
|
workTask = Task.Run(async () =>
|
|
{
|
|
await SendAsync(new IpcFrame(
|
|
Kind: IpcKinds.Log,
|
|
CorrelationId: correlationId,
|
|
Payload: IpcProtocol.ToJsonElement(new { level = "info", message = $"Child {childId}: work started ({steps} steps)." })), wct);
|
|
|
|
for (int i = 1; i <= steps; i++)
|
|
{
|
|
wct.ThrowIfCancellationRequested();
|
|
await Task.Delay(delayMs, wct);
|
|
|
|
double percent = (i * 100.0) / steps;
|
|
await SendAsync(new IpcFrame(
|
|
Kind: IpcKinds.Progress,
|
|
CorrelationId: correlationId,
|
|
Payload: IpcProtocol.ToJsonElement(new { step = i, total = steps, percent })), wct);
|
|
|
|
if (i % Math.Max(1, steps / 5) == 0)
|
|
{
|
|
await SendAsync(new IpcFrame(
|
|
Kind: IpcKinds.Log,
|
|
CorrelationId: correlationId,
|
|
Payload: IpcProtocol.ToJsonElement(new { level = "debug", message = $"Child {childId}: reached step {i}/{steps}." })), wct);
|
|
}
|
|
}
|
|
|
|
await SendAsync(new IpcFrame(
|
|
Kind: IpcKinds.Result,
|
|
CorrelationId: correlationId,
|
|
Payload: IpcProtocol.ToJsonElement(new { message = $"Child {childId}: work finished." })), wct);
|
|
}, wct);
|
|
}
|
|
|
|
try
|
|
{
|
|
while (!appCts.IsCancellationRequested)
|
|
{
|
|
IpcFrame? frame = await IpcProtocol.ReadFrameAsync(pipe, cancellationToken: appCts.Token);
|
|
if (frame is null)
|
|
{
|
|
break; // parent disconnected
|
|
}
|
|
|
|
switch (frame.Kind)
|
|
{
|
|
case IpcKinds.Ping:
|
|
await SendAsync(new IpcFrame(
|
|
Kind: IpcKinds.Pong,
|
|
CorrelationId: frame.CorrelationId,
|
|
Payload: IpcProtocol.ToJsonElement(new { childId = childId.Value })), appCts.Token);
|
|
break;
|
|
|
|
case IpcKinds.StartWork:
|
|
{
|
|
var payload = IpcProtocol.FromJsonElement<StartWorkPayload>(frame.Payload) ?? new StartWorkPayload();
|
|
string corr = frame.CorrelationId ?? Guid.NewGuid().ToString("N");
|
|
await StartWorkAsync(corr, payload.Steps, payload.DelayMs, appCts.Token);
|
|
break;
|
|
}
|
|
|
|
case IpcKinds.CancelWork:
|
|
await CancelWorkAsync();
|
|
await SendAsync(new IpcFrame(
|
|
Kind: IpcKinds.Log,
|
|
CorrelationId: frame.CorrelationId ?? currentWorkCorrelationId,
|
|
Payload: IpcProtocol.ToJsonElement(new { level = "info", message = $"Child {childId}: work cancelled." })), appCts.Token);
|
|
break;
|
|
|
|
default:
|
|
await SendAsync(new IpcFrame(
|
|
Kind: IpcKinds.Error,
|
|
CorrelationId: frame.CorrelationId,
|
|
Payload: IpcProtocol.ToJsonElement(new { message = $"Unknown command kind '{frame.Kind}'." })), appCts.Token);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// shutting down
|
|
}
|
|
finally
|
|
{
|
|
await CancelWorkAsync();
|
|
}
|
|
|
|
return 0;
|
|
|
|
file sealed record StartWorkPayload(int Steps = 25, int DelayMs = 150);
|