using System.Buffers; using System.Text.Json; namespace CommIpc; public static class IpcProtocol { // Keep the prototype safe from accidental runaway memory usage. public const int DefaultMaxFrameBytes = 4 * 1024 * 1024; // 4 MiB public static async Task WriteFrameAsync(Stream stream, IpcFrame frame, CancellationToken cancellationToken = default) { // Always include a timestamp if the sender didn't set one. if (frame.Timestamp is null) { frame = frame with { Timestamp = DateTimeOffset.UtcNow }; } byte[] json = JsonSerializer.SerializeToUtf8Bytes(frame, IpcJson.Options); byte[] header = new byte[4]; int len = json.Length; header[0] = (byte)(len & 0xFF); header[1] = (byte)((len >> 8) & 0xFF); header[2] = (byte)((len >> 16) & 0xFF); header[3] = (byte)((len >> 24) & 0xFF); await stream.WriteAsync(header, cancellationToken).ConfigureAwait(false); await stream.WriteAsync(json, cancellationToken).ConfigureAwait(false); await stream.FlushAsync(cancellationToken).ConfigureAwait(false); } public static async Task ReadFrameAsync(Stream stream, int maxFrameBytes = DefaultMaxFrameBytes, CancellationToken cancellationToken = default) { byte[] header = ArrayPool.Shared.Rent(4); try { int headerRead = await ReadExactOrEofAsync(stream, header, 0, 4, cancellationToken).ConfigureAwait(false); if (headerRead == 0) { return null; // clean EOF } if (headerRead != 4) { throw new EndOfStreamException("Unexpected end of stream while reading frame header."); } int len = header[0] | (header[1] << 8) | (header[2] << 16) | (header[3] << 24); if (len < 0) { throw new InvalidDataException("Negative frame length."); } if (len == 0) { throw new InvalidDataException("Zero-length frame."); } if (len > maxFrameBytes) { throw new InvalidDataException($"Frame too large: {len} bytes (limit {maxFrameBytes})."); } byte[] payload = ArrayPool.Shared.Rent(len); try { int read = await ReadExactOrEofAsync(stream, payload, 0, len, cancellationToken).ConfigureAwait(false); if (read != len) { throw new EndOfStreamException("Unexpected end of stream while reading frame payload."); } // Deserialize from the rented buffer slice. return JsonSerializer.Deserialize(payload.AsSpan(0, len), IpcJson.Options); } finally { ArrayPool.Shared.Return(payload); } } finally { ArrayPool.Shared.Return(header); } } public static JsonElement ToJsonElement(T value) { using JsonDocument doc = JsonDocument.Parse(JsonSerializer.SerializeToUtf8Bytes(value, IpcJson.Options)); return doc.RootElement.Clone(); } public static T? FromJsonElement(JsonElement? element) { if (element is null) { return default; } return element.Value.Deserialize(IpcJson.Options); } private static async Task ReadExactOrEofAsync(Stream stream, byte[] buffer, int offset, int count, CancellationToken cancellationToken) { int total = 0; while (total < count) { int n = await stream.ReadAsync(buffer.AsMemory(offset + total, count - total), cancellationToken).ConfigureAwait(false); if (n == 0) { return total; // EOF } total += n; } return total; } }