2 minute read

System.IO.Pipelines are said to be fast.

Microsoft documentation is something to behold. Amazingly extensive and paradoxically hard to gather information from. So here is my straight to the point rundown on C# System.IO.Pipelines.

Screenshot taken from this Youtube video “High performance IO with System.IO.Pipelines”.

The Code

To use System.IO.Pipelines you first have to have a connected socket:

using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;

/// IP and DNS work
var ipHostInfo = await Dns.GetHostEntryAsync(this.Options.Host).ConfigureAwait(false);
var ipAddress = ipHostInfo.AddressList[1];
IPEndPoint ipEndPoint = new(ipAddress, this.Options.Port);

/// Initialize the socket and call `ConnectAsync`
this.socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
await this.socket.ConnectAsync(ipEndPoint).ConfigureAwait(false);

/// Validation & Checks
var socketConnected = this.socket.Connected;

if (!socketConnected || this.socket == null)
{
    throw new InvalidOperationException("Failed to connect socket");
}

With that, you can then initialize a Stream and the Pipeline.

// Setup the Pipeline
this.stream = new NetworkStream(this.socket);
this.reader = PipeReader.Create(this.stream);
this.writer = PipeWriter.Create(this.stream);

The Coding Patterns

We now have Pipelines setup and ready to be used. To read and write from the Pipeline use the following patterns:

var writeResult = await this.writer.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false);
ReadResult readResult = await this.reader.ReadAsync().ConfigureAwait(false);

The buffer of received data is in readResult.Buffer which is of type ReadOnlySequence<byte>.

Here is an example from the HiveMQ C# MQTT Client decoding an incoming MQTT CONNACK packet.

/// Called with Decode(readResult.Buffer)
public void Decode(ReadOnlySequence<byte> packetData)
{
    var packetLength = packetData.Length;
    var reader = new SequenceReader<byte>(packetData);

    // Skip past the Fixed Header
    reader.Advance(2);

    if (reader.TryRead(out var ackFlags))
    {
        this.SessionPresent = (ackFlags & 0x1) == 0x1;
    }

    if (reader.TryRead(out var reasonCode))
    {
        this.ReasonCode = (ConnAckReasonCode)reasonCode;
    }

    /// ....etc...
}

TLS: Transport Layer Security

How about if you want SSL/TLS?

Another example from the HiveMQ C# MQTT Client…


using System.Net.Security;
using System.Net.Sockets;
using System.Security.Cryptography.X509Certificates;

// Setup the stream
this.stream = new NetworkStream(this.socket);
if (this.Options.UseTLS)
{
    this.stream = new SslStream(this.stream, false, HiveMQClient.ValidateServerCertificate, null);
    await ((SslStream)this.stream).AuthenticateAsClientAsync(this.Options.Host).ConfigureAwait(false);
}

// Setup the Pipeline
this.reader = PipeReader.Create(this.stream);
this.writer = PipeWriter.Create(this.stream);


internal static bool ValidateServerCertificate(
        object sender,
        X509Certificate certificate,
        X509Chain chain,
        SslPolicyErrors sslPolicyErrors)
{
    if (sslPolicyErrors == SslPolicyErrors.None)
    {
        return true;
    }

    Console.WriteLine("Certificate error: {0}", sslPolicyErrors);

    // Do not allow this client to communicate with unauthenticated servers.
    return false;
}

I’ll update this post and examples from time to time as things evolve until it is complete. If you have any feedback or questions, feel free to get in touch!

MQTT is a extremely efficient binary protocol used to publish and consume message. This is what we do at HiveMQ. Checkout the HiveMQ C# MQTT client for more code examples using System.IO.Pipelines.

Categories: ,

Updated: