115 lines
3.9 KiB
C#
115 lines
3.9 KiB
C#
using System.Buffers;
|
|
using System.Text.Json;
|
|
|
|
namespace CommIpc;
|
|
|
|
public static class IpcProtocol
|
|
{
|
|
// Keep the prototype safe from accidental runaway memory usage.
|
|
public const int DefaultMaxFrameBytes = 4 * 1024 * 1024; // 4 MiB
|
|
|
|
public static async Task WriteFrameAsync(Stream stream, IpcFrame frame, CancellationToken cancellationToken = default)
|
|
{
|
|
// Always include a timestamp if the sender didn't set one.
|
|
if (frame.Timestamp is null)
|
|
{
|
|
frame = frame with { Timestamp = DateTimeOffset.UtcNow };
|
|
}
|
|
|
|
byte[] json = JsonSerializer.SerializeToUtf8Bytes(frame, IpcJson.Options);
|
|
byte[] header = new byte[4];
|
|
int len = json.Length;
|
|
header[0] = (byte)(len & 0xFF);
|
|
header[1] = (byte)((len >> 8) & 0xFF);
|
|
header[2] = (byte)((len >> 16) & 0xFF);
|
|
header[3] = (byte)((len >> 24) & 0xFF);
|
|
|
|
await stream.WriteAsync(header, cancellationToken).ConfigureAwait(false);
|
|
await stream.WriteAsync(json, cancellationToken).ConfigureAwait(false);
|
|
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
public static async Task<IpcFrame?> ReadFrameAsync(Stream stream, int maxFrameBytes = DefaultMaxFrameBytes, CancellationToken cancellationToken = default)
|
|
{
|
|
byte[] header = ArrayPool<byte>.Shared.Rent(4);
|
|
try
|
|
{
|
|
int headerRead = await ReadExactOrEofAsync(stream, header, 0, 4, cancellationToken).ConfigureAwait(false);
|
|
if (headerRead == 0)
|
|
{
|
|
return null; // clean EOF
|
|
}
|
|
if (headerRead != 4)
|
|
{
|
|
throw new EndOfStreamException("Unexpected end of stream while reading frame header.");
|
|
}
|
|
|
|
int len = header[0] | (header[1] << 8) | (header[2] << 16) | (header[3] << 24);
|
|
|
|
if (len < 0)
|
|
{
|
|
throw new InvalidDataException("Negative frame length.");
|
|
}
|
|
if (len == 0)
|
|
{
|
|
throw new InvalidDataException("Zero-length frame.");
|
|
}
|
|
if (len > maxFrameBytes)
|
|
{
|
|
throw new InvalidDataException($"Frame too large: {len} bytes (limit {maxFrameBytes}).");
|
|
}
|
|
|
|
byte[] payload = ArrayPool<byte>.Shared.Rent(len);
|
|
try
|
|
{
|
|
int read = await ReadExactOrEofAsync(stream, payload, 0, len, cancellationToken).ConfigureAwait(false);
|
|
if (read != len)
|
|
{
|
|
throw new EndOfStreamException("Unexpected end of stream while reading frame payload.");
|
|
}
|
|
|
|
// Deserialize from the rented buffer slice.
|
|
return JsonSerializer.Deserialize<IpcFrame>(payload.AsSpan(0, len), IpcJson.Options);
|
|
}
|
|
finally
|
|
{
|
|
ArrayPool<byte>.Shared.Return(payload);
|
|
}
|
|
}
|
|
finally
|
|
{
|
|
ArrayPool<byte>.Shared.Return(header);
|
|
}
|
|
}
|
|
|
|
public static JsonElement ToJsonElement<T>(T value)
|
|
{
|
|
using JsonDocument doc = JsonDocument.Parse(JsonSerializer.SerializeToUtf8Bytes(value, IpcJson.Options));
|
|
return doc.RootElement.Clone();
|
|
}
|
|
|
|
public static T? FromJsonElement<T>(JsonElement? element)
|
|
{
|
|
if (element is null)
|
|
{
|
|
return default;
|
|
}
|
|
return element.Value.Deserialize<T>(IpcJson.Options);
|
|
}
|
|
|
|
private static async Task<int> ReadExactOrEofAsync(Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
|
{
|
|
int total = 0;
|
|
while (total < count)
|
|
{
|
|
int n = await stream.ReadAsync(buffer.AsMemory(offset + total, count - total), cancellationToken).ConfigureAwait(false);
|
|
if (n == 0)
|
|
{
|
|
return total; // EOF
|
|
}
|
|
total += n;
|
|
}
|
|
return total;
|
|
}
|
|
}
|