Lev's blog

Sharding Postgres with logical replication

This is old news by now, but since PostgreSQL 10, we can use the replication protocol to sync individual tables between databases. Messages sent between databases describe table changes in a readable format, which also happens to provide enough information for us to split that data between shards.

This document contains the overall thesis on how this works. The implementation is currently being developed as an open source project.

The logical replication protocol has two stages:

Both stages use the COPY protocol1 which we can read and modify in real time.

COPY protocol

On the surface, the COPY protocol has only one message: CopyData. It's format is deceptively simple and contains only 3 fields:

Field Size Contents
Name 1 byte 'C'
Size 4 bytes Length of the message, including self.
Payload Variable The message data encoded as bytes.

When a table is copied, the server will read all data from the table and, for each row, send a CopyData message with its columns.

Let's use the ubiquitous "users" table as an example:

CREATE TABLE users (
    id BIGINT PRIMARY KEY,
    email VARCHAR NOT NULL UNIQUE
);

To make this clearer, let's add a couple rows:

INSERT INTO users VALUES (1, 'admin@example.com');
INSERT INTO users VALUES (2, 'user@example.com');

Now if we copy this table using COPY users TO STDOUT, the server will send the following CopyData messages:

C | \x00\x00\x00\x16 | 1\tadmin@example.com
C | \x00\x00\x00\x15 | 2\tuser@example.com

The length of the message, which includes itself, is encoded as a bigendian 32-bit signed integer. The payload is formatted using the "text" encoding separating columns using the \t (tab) character. The columns themselves are just values printed as text.

Table updates

Changes made to tables are sent in real time, with data encapsulated inside the payload of CopyData messages.

Logical replication has 6 messages2 that we care about:

The Begin message indicates the start of a transaction. It's important to note that all changes to tables are encapsulated inside transactions. Even if only one query is executed, it's still automatically placed inside a transaction.

This message contains the transaction identifier (also known as an Xid) and its position in the WAL (LSN). These can be used to locate the current position in the replication stream and to calculate replication lag.

The Relation message describes the table that's being updated. It contains information like the table name, number of columns and their names and data types. This information is relevant to the subsequent Insert, Update and Delete messages.

Data changes

The Insert, Update and Delete messages are similar and contain a single row. This row is either inserted, updated, or deleted. In all 3 cases, the message will contain enough information to perform that action on a different database.

The row itself is encapsulated inside yet another message called TupleData, which stores it using the same "text" encoding used in COPY:

\x00\x02 | t | \x00\x00\x00\x01 | 1 | \x00\x00\x00\x11 | admin@example.com
\x00\x02 | t | \x00\x00\x00\x01 | 2 | \x00\x00\x00\x10 | user@example.com

In order of appearance, the columns are:

and for each column in the tuple

By now you might be able to see that the logical replication protocol contains enough information for a carefully placed network proxy to read this data and split it into any number of shards, but more on this below.

Finally, the Commit message indicates the end of a transaction. Changes streamed beforehand should be written to a database atomically, especially if they span multiple rows or tables.

Shard replication

For both stages of logical replication, a network proxy is placed between the primary database and its shards. The proxy is configured to know the identity of each database (using its network address) and reads all messages sent between them.

Each shard creates a logical replication subscription to the primary and receives only the data it should, filtered by a hashing function.

                                                                             +------------+
                                                                             |            |
                                        +------ hash(CopyData) % 2 == 0 -----|  Shard 0   |
                                        |                                    |            |
                                        |                                    +------------+
+-----------+                     +-----+-----+                                      
|           |                     |           |                                      
|  Primary  |------ CopyData -----+   Proxy   |                                      
|           |                     |           |                                      
+-----------+                     +-----+-----+                                      
                                        |                                    +------------+
                                        |                                    |            |
                                        +------ hash(CopyData) % 2 == 1 -----|  Shard 1   |
                                                                             |            |
                                                                             +------------+

Shard COPY

To shard CopyData messages sent during the COPY stage, the proxy needs two parsers:

For each table we replicate, Postgres will send a COPY <table> (<columns>) TO STDOUT query to the primary database. The proxy intercepts that query and, using pg_query, extracts the table name and columns. This allows it to understand the format for subsequent CopyData messages.

As messages start to arrive, the proxy reads them using a CSV parser, extracts the column configured for sharding (based on the table name) and calculates the shard number using a hashing function. If the row should go to the connected database, the message is forwarded as-is. If, on the other hand, the row should go to a different shard, the message is dropped.

2 conditions have to be true (and they are) for this to work:

  1. The receiver has no expectations on the number of rows it should receive
  2. CopyData messages contain exactly one row3

Shard table updates

To shard table update messages (Insert, Update, Delete), we have to write a custom parser. Since the protocol is well documented, that's pretty easy.

As replication messages are sent through the proxy, it builds its own internal state of the replication stream. For each Relation message, it stores the table OID and schema in a hash table. As table changes arrive, the proxy can inspect their contents, find the sharding column and calculate the shard using the same hashing function. If the table update belongs to the connected database, it's forwarded as-is. Otherwise, it's dropped.

Much like COPY, the receiver doesn't have any expectations about the number of table changes it gets, as long as the overall protocol consistency is preserved. Our proxy makes sure that only optional messages, like Insert, Update, and Delete are dropped, while maintaining connection state.

Optimizations

As the number of shards grows, it's easy to see how this system can become inefficient. For instance, resharding the cluster will require sending the same data <number of shards> x <number of new shards> times, while maintaining an equal number of replication slots and streams.

Durable cache

The data multiplier isn't necessary. Since the same data is being sent (and mostly dropped), we can cache it in the proxy. The proxy maintains an internal state of all connections between databases and can multiplex both COPY messages and table updates.

Much like a transaction pooler uses one connection to serve multiple clients, the proxy can maintain one logical replication connection to the database while serving the same data to all shards. This will require the proxy to maintain its own data pool and have strong durability guarantees.

Push changes

Instead of caching data as it comes through, the proxy can create logical replication connections and push changes to the shards as it receives them. The shards in this case will receive data in the form of standard COPY statements for the initial sync and INSERT/UPDATE/DELETE queries for real time updates.

This mechanism will require the proxy to behave like a distributed transaction coordinator and for shards to use two-phase commit4.

Both of these optimizations need a deeper dive which will be written separately.

Sharding function

Picking the right sharding function proved to be easier than expected. The proxy uses the same hashing function as declarative partitioning5, which was introduced in PostgreSQL 10. This function has several benefits:

  1. Shards can maintain their own data partitions, which provides a second layer of safety against accidental writes
  2. Data can be sharded separately using partitioned foreign tables6
  3. The hashing function has good data distribution characteristics and produces partitions (and shards) of roughly equal size

Next steps

This post is the first in the series on horizontally scaling PostgreSQL with automatic sharding. The network proxy is called PgDog and is being developed as an open source project.

In later posts, we'll explore topics like routing queries to the right shards and implementing cross-shard transactions, including support for aggregate7 functions and sorting.

References

  1. COPY protocol documentation

  2. Logical replication message formats

  3. Copy-out mode guarantees

  4. Prepared transactions

  5. Table partitioning

  6. Partitioned foreign tables

  7. Aggregate functions