Lev's blog

Working with the Postgres protocol

PostgreSQL clients and servers talk to each other via TCP. TCP is a streaming protocol, which means that data sent over the socket isn't delineated in any way: it's all just a bunch of bytes.

Clients and servers deal with this limitation by expecting specific bytes to arrive at a certain time in the life of a connection. They are asynchronous state machines, which means they can send (and receive) data at any time without having to wait for a response, as long as data is sent (and received) in the right order.

All examples in this article use Rust, Tokio and the Bytes crate. You can test them locally by creating your own PostgreSQL server:

use tokio::io::*;
use tokio::net::*;
use bytes::*;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("0.0.0.0:6432").await?;
    let (mut stream, _addr) = listener.accept().await?;
    
    Ok(())
}

Any Postgres client can connect to it, including psql:

psql -h 127.0.0.1 -p 6432

Messages

Postgres has two kinds of messages clients and servers expect to receive:

Startup message

The startup message starts with a 32-bit signed integer. This number indicates the length of data (including self) the receiver should expect to read from the socket.

let len = stream.read_i32().await?;

Message length

This field merits its own article, but I'll summarize a couple important details here.

First of all, it's a good idea to check that this number isn't too large before using it. If your PostgreSQL server is facing the public Internet, someone could easily DoS it by sending a very large message.

Furthermore, don't forget that the length includes itself, so subtract 4 (or to be perfectly correct, std::mem::size_of::<i32>()) before allocating a buffer:

let len = (len - std::mem::size_of::<i32>() as i32) as usize;

I've made this mistake countless times and got my server stuck waiting for 4 bytes of data that never arrived.

Read the rest

Reading the rest of the message should ideally be done in one memory allocation:

// Allocate memory once.
let mut buf = BytesMut::with_capacity(len);

// Fill the buffer with zeros.
buf.resize(len as usize, 0u8);

// Fill the entire buffer with data from the socket.
// Block until all data is received.
stream.read_exact(&mut buf).await?;

Reducing memory allocations is a good idea early on. If your server talks to a lot of clients, a substantial amount of CPU time can be wasted (re)allocating memory.

Note on BytesMut

We are using BytesMut and not Vec because BytesMut uses a memory buffer which is tracked with a reference counter (specifically std::sync::Arc), so we can clone it as many times as we want without allocating more memory:

buf.clone().clone().clone(); // No memory allocations!

Memory is a precious resource in network servers and it shouldn't be wasted.

Startup message kinds

Let's get back to working with Postgres messages. The startup message starts with an identifier, specifically a 32-bit signed integer, which tells us what kind of message it is:

Identifier Description
196608 StartupMessage. Startup parameters like user, database, and other optional ones like statement_timeout, search_path, etc.
80877103 SSLRequest. Client wants to use TLS (a.k.a. SSL).
80877102 CancelRequest. Client wants to cancel a query currently executing on the server.
let identifier = buf.get_i32();

match identifier {
    196608 => { /* Handle StartupMessage. */ }
    80877103 => { /* Negotiate a TLS connection. */ }
    80877102 => { /* Handle query cancellation request. */}
    _ => panic!("Unknown startup identifier: {}", identifier),
}

Right about now is a good time to mention that I'm not writing this from memory; protocol specifics are documented thoroughly on postgresql.org.

Ignoring all others for now, let's take a look at what's inside a regular StartupMessage. According to the specification, we should expect to see a key/value mapping of startup parameter names and values.

Reading strings

The names are encoded as C-style strings, which means they are sequences of bytes terminated with an ASCII NUL character (\0). Reading one from a buffer may need its own function:

/// Read a C-style string from a buffer, while advancing
/// the internal iterator forward by the number of bytes
/// in the string.
pub fn c_string_buf(buf: &mut impl Buf) -> Vec<u8> {
    let mut result = Vec::new();
    while buf.remaining() > 0 {
        let c = buf.get_u8();

        if c != 0 {
            result.push(c);
        } else {
            break;
        }
    }

    result
}

This function is quite neat since it won't panic if the buffer runs out of bytes before it reads a string terminator. If you're not checking for this condition, an attacker can send your server an unterminated C-string and crash your server.

Rust is memory-safe, so you won't leak passwords because of a buffer overflow, but you sure can crash your server by not paying close attention to data coming from the network.

Let's read our startup parameters and see who's connecting to our server:

use std::collections::HashMap;

let mut params = HashMap::new();
loop {
    let key = c_string_buf(&mut buf);
    if key.is_empty() { break; } // Reached the end of the key/value map.
    // Convert key to UTF-8 string.
    let key = str::from_utf8(&key)
        .unwrap_or_default()
        .to_string();

    // Same for the value.
    let value = c_string_buf(&mut buf);
    let value = str::from_utf8(&value)
         .unwrap_or_default()
         .to_string();

    params.insert(key, value);
}

// Let's see what we got.
println!("{:#?}", params);

I made a pretty big assumption here that keys and values are valid UTF-8 strings. PostgreSQL actually supports many different encodings and your clients could be using a different one. Parsing encodings is worthy of its own book, so let's just go ahead with UTF-8 for now.

Launch the server

If you're still with me, thanks for sticking through pages of theory. I had to do the same when building PgDog, so I know the feeling.

Let's start our server and connect to it with psql:

# Start the server. You'll see no output
# since the server is waiting for a connection.
cargo run

# In a separate terminal window:
psql "postgres://127.0.0.1:6432/?sslmode=disable"

Note the connection string I'm using is explicitly disabling SSL (TLS) connections. We are not handling them at the moment, and psql always tries to connect with TLS first.

If everything goes well, the server will print out something like this:

{
    "database": "lev",
    "client_encoding": "UTF8",
    "application_name": "psql",
    "user": "lev",
}

Great! I guessed correctly and the client did use UTF-8 encoding to talk to us. If we wanted to look at the bytes we received, it would look something like this:

\x00\x00\x00J | \x00\x03\x00\x00 | user\0lev\0database\0lev\0client_encoding\0UTF8\0application_name\0psql\0\0

The | character is just to visually delineate fields and isn't actually part of the protocol. Integers are using bigendian encoding (also called network byte order). Decoding those into Rust's i32 is handled for us automatically by the bytes and tokio crates.

Regular protocol

At this point, the startup sequence is complete and we can drop down into using regular PostgreSQL protocol messages.

All messages from now on will follow the same format:

Field Length Description
Identifier 1 byte Single ASCII character identifying a particular message.
Length 4 bytes 32-bit signed integer describing the length of the message (including self).
Payload Variable Data, formatted according to the message specification.

To make it easier, let's define a struct:

#[derive(Debug)]
pub struct Message {
    identifier: char,
    len: usize, // Length will always be >= 0.
    payload: Bytes,
}

Knowing the format of our message, let's implement reading it from a socket, code for which will look pretty familiar:

impl Message {
    /// Read a message from a stream.
    pub async fn read(mut stream: impl AsyncRead + Unpin) -> io::Result<Self> {
        let identifier = stream.read_u8().await? as char;
        let len = stream.read_i32().await? as usize;

        // Pre-allocate memory for the payload.
        let mut buf = BytesMut::with_capacity(len - 4);
        buf.resize(len - 4, 0u8);
        stream.read_exact(&mut buf).await?;

        Ok(Self {
            identifier,
            len,
            payload: buf.freeze(),
        })
    }
}

Now is a good time as any to talk about buffering and performance.

Buffering sockets

You may have noticed that in the previous example, we read 3 values from the stream: a u8 (1 byte), i32 (4 bytes), and len amount of bytes. What isn't obvious is that we made at least 3 syscalls (and probably more because read_exact calls read repeatedly).

Context switching between the kernel and userspace is actually pretty expensive and applications that read small amounts of data directly from the socket will suffer from poor performance.

To avoid this issue, tokio comes with a buffered stream. This struct will automatically read whatever data is available in the socket kernel buffer and place it into a userspace buffer. All subsequent calls to read will read from that buffer instead of from the kernel.

Therefore, as soon as you obtain a socket from a call to TcpListener::accept, place it inside a BufStream:

let mut stream = BufStream::new(stream);

All other code we wrote remains the same, since BufStream implements the same AsyncRead trait we've been using this entire time.

Performance issues solved, let's get back to reading and writing Postgres protocol messages.

Authentication

At this point, psql is expecting an authentication challenge. Typically a real PostgreSQL server would request the client to authenticate using SCRAM-SHA-256 or an MD5-hashed password, but since these pretty complex algorithms are out of scope for us at the moment, let's just respond with AuthenticationOk which means authentication (even if there wasn't any) was successful.

As promised, this message (and all others we'll be working with) follows the standard Postgres protocol format:

Identifier 'R'
Length \x00\x00\x00\x08 (8 bytes)
Payload \x00\x00\x00\x00 (0)

To make it easier, let's define a struct for this message and a conversion into our generic protocol Message struct:

#[derive(Debug)]
pub struct AuthenticationOk; // No fields.

impl From<AuthenticationOk> for Message {
    fn from(_: AuthenticationOk) -> Self {
        let mut payload = BytesMut::new();
        payload.put_i32(0);

        Self {
            identifier: 'R',
            len: 8,
            payload: payload.freeze(),
        }
    }
}

Everything looks good. To send this message to the client, we need a way to write our Message struct to a stream:

impl Message {
    /// Write message to stream.
    pub async fn write(&self, mut stream: impl AsyncWrite + Unpin) -> io::Result<()> {
        stream.write_u8(self.identifier as u8).await?;
        stream.write_i32(self.len as i32).await?;
        stream.write_all(&self.payload).await?;

        Ok(())
    }

We are no longer worried about using too many syscalls, but, since the socket is also buffered for writes, we need to make sure data we wrote actually gets sent to the client. By default, BufStream uses a buffer with space for 4096 bytes. If our message is smaller, no data will be sent until we flush the buffer:

stream.flush().await?;

My recommendation is to avoid flushing the buffer until you're expecting a response from the client. Buffering multiple messages improves the performance of your network server dramatically, which you'll notice even when testing on your local machine.

In the meantime, let's go ahead and tell the client that authentication is not necessary:

Message::from(AuthenticationOk).write(&mut stream).await?;

Getting to the finish line

With authentication over, the client is expecting the server to identify itself and provide it with information about the connection. This involves sending a minimum of 2 more messages:

After reading the spec, definition for both is pretty easy to implement:

/// Unique identifier for a connection.
#[derive(Debug)]
pub struct BackendKeyData {
    pub pid: i32,
    pub secret: i32,
}

impl BackendKeyData {
    /// Generate a unique identifier
    /// using a secure random number generator.
    pub fn new() -> Self {
        use rand::Rng;

        Self {
            pid: rand::thread_rng().gen(),
            secret: rand::thread_rng().gen(),
        }
    }
}

impl From<BackendKeyData> for Message {
    fn from(value: BackendKeyData) -> Self {
        let mut payload = BytesMut::new();
        payload.put_i32(value.pid);
        payload.put_i32(value.secret);

        Self {
            identifier: 'K',
            len: 12,
            payload: payload.freeze(),
        }
    }
}

/// Message indicating the server is ready
/// to execute a query.
#[derive(Debug)]
pub struct ReadyForQuery {
    state: char,
}

impl ReadyForQuery {
    /// Server is Idle, ready for the next transaction.
    pub fn new() -> Self {
        Self { state: 'I' }
    }
}

impl From<ReadyForQuery> for Message {
    fn from(value: ReadyForQuery) -> Self {
        let mut payload = BytesMut::new();
        payload.put_u8(value.state as u8);

        Self {
            identifier: 'Z',
            len: 5,
            payload: payload.freeze(),
        }
    }
}

Let's go ahead and send both of these to the client and finish connection initialization:

Message::from(BackendKeyData::new())
    .write(&mut stream)
    .await?;
Message::from(ReadyForQuery::new())
    .write(&mut stream)
    .await?;

// Don't forget to flush the buffers!
stream.flush().await?;

If you run your server now and connect to it with psql, you should be able to get it to give you a query prompt:

psql (16.4 (Postgres.app), server 0.0.0)
WARNING: psql major version 16, server major version 0.0.
         Some psql features might not work.
Type "help" for help.

lev=>

Closing thoughts

In this article we learned how to work with a TCP-based protocol, read its specification and implement a basic PostgreSQL-like server. Working with network programming is a fun way to interact with complex software written by other people and getting it to work, at least for me, gives me that "aha!" moment.

Variations of this code are used directly in PgDog, a network proxy for automatically sharding Postgres.

In the next article, we'll take a look at how to build a state machine that simulates, from a networking perspective, a fully functional, asynchronous, PostgreSQL server.

Read more