Routing queries in sharded Postgres
When you split your database into shards, you need to figure out which one has the data you're looking for. The simplest solution is to query all of them, but this is slow, even if done in parallel, and doesn't scale.
However, if you know the sharding function used to split the data, you can apply that same function to your query parameters, and send it to only one shard.
This post is part of a series about sharding Postgres. The solution is being developed as an open source project.
How to shard
Sharding your database is not always obvious, but there are a few guidelines. The golden rule is to avoid having your app query more than one shard during an HTTP request.
If you're running a B2B SaaS, sharding on tenant_id will make your database easy to query: data relevant to a customer will always be in the same machine.
If you're more of a consumer-facing business, you'd have to look at how your data is used by your application. If most of your queries are centered around a user, sharding on users.id will produce an even split and make your data easily accessible.
On the other hand, if you're a social network, your app is essentially a graph of users, so you'd shard on something that connects them, like geography; users.city_id comes to mind.
Two problems, one solution
Once you picked your sharding key and sharded your database, your app needs to be able to:
- Connect to all the shards
- Figure out which shard have the data for each query
The first problem is called service discovery: how many shards there are, where they are, and how to connect to them. The second is query routing.
Both can be solved by running a proxy between clients and databases. The proxy understands the PostgreSQL wire protocol1, so it can extract the right query parameter, hash its value and send the query to the right database.
                                                                                   +----------+
                                                                                   |          |
                                                            +-hash(id) % 2 == 0 ---+ Shard 0  |
+-----------+                             +----------+      |                      |          |
|           |                             |          |      |                      +----------+
|  Client   +-----SELECT * FROM users ----+  Proxy   +------+                                  
|           |        WHERE id = $1        |          |      |                      +----------+
+-----------+                             +----------+      |                      |          |
                                                            +-hash(id) % 2 == 1 ---+ Shard 1  |
                                                                                   |          |
                                                                                   +----------+
PostgreSQL wire protocol
PostgreSQL queries come in two flavors: simple queries and prepared statements. Simple queries are just text, including parameters:
SELECT * FROM users WHERE id = 25
Extracting parameters from queries is straightforward: using a parser like pg_query, our proxy can figure out that the query is targeting the "users" table and asking for the row where the primary key is equal to 25.
To distinguish simple queries from prepared statements, Postgres sends it in its own Query2 message:
Q | \x00\x00\x00& | SELECT * FROM users WHERE id = 25
The message contains 3 fields:
| Field | Size | Contents | 
|---|---|---|
| Name | 1 byte | 'Q' | 
| Length | 4 bytes | Length of the message in bytes, including self. | 
| Query | Variable | Query text. | 
Prepared statements
Prepared statements send the query text and parameters in two separate messages. The query text arrives in a Parse3 message:
P | \x00\x00\x00/ | __stmt_1 | SELECT * FROM users WHERE id = $1 | \x00\x00
This message has more fields, but we care only about 2 of them:
| Field | Size | Contents | 
|---|---|---|
| Statement name | Variable | Name of the prepared statement. | 
| Query | Variable | The text of the prepared statement. | 
Finally, query parameters arrive separately in a Bind4 message:
B | \x00\x00\x00\x1c | "" | __stmt_1 | \x00\x01 | \x00\x00 | \x00\x01 | \x00\x00\x00\x02 | 25 | \x00\x00
This message is a bit more complex. It contains the value of the $1 parameter and encoding information. Using a stateful parser, we can extract the parameter value, hash it, while mapping it to the correct prepared statement. The statement is valid SQL, so we can parse it using pg_query and match its parameters to their corresponding values.
Once we compute the shard number, we can send queries to only the shard where the data is likely to be, and that's how Postgres is scaled horizontally.
Cross-shard queries
Cross-shard queries span multiple shards because they don't contain a sharding key. While it's best to avoid them, they are inevitable. For example, if we wanted to fetch all users located in a particular city, and the database is sharded on users.id, the query won't contain a sharding parameter:
SELECT * FROM users WHERE city_id IN (1, 25, 50) AND active = true
The proxy handles this use case by sending the same query to all shards in parallel and forwarding the rows to the client as they arrive from each shard. They aren't stored in memory, so there is no limit on how many rows these types of queries can fetch.
Sorting & aggregates
While it's nice to be able to query multiple shards, the results returned in random order is somewhat inconvenient. More often than not, clients want some kind of ordering to make sense of the data:
SELECT * FROM users WHERE city_id IN (1, 25, 50) ORDER BY last_login DESC;
To support sorting, the proxy needs to receive all rows from all shards first, sort them by the requested column(s) and, only then, send them to the client. To support aggregates, the same needs to happen, except before sending the rows to the client, the proxy needs to merge some of them, depending on the aggregate function.
More details on sorting and aggregation will be shared in a separate article.
Bypassing the sharder
Sometimes it's not desirable to query multiple shards simultaneously. For example, when the result is too large or we know in advance which shard has the information we're looking for.
The client can configure the shard it wants to talk to by specifying it in a query comment:
/* pgdog_shard: 0 */ SELECT * FROM users WHERE city_id = 5
Of course, this assumes the caller knows how many shards there are. While not ergonomic, this "break glass" feature is nice to have for use cases not currently supported by the proxy.
Special queries
It's common for users to execute queries that don't target any tables. For example, queries like SELECT 1 check if the connection is still alive, or SELECT NOW() asks for the current time in the database timezone. Both of these don't have any shard information. Without special handling, they would be sent to all shards and return more rows than expected.
These are handled by sending them to just one shard. The shard is selected using a round robin algorithm, making sure the load (no matter how small) is spread evenly across the cluster.
Handling data changes
A query that updates a particular user has the sharding key as one of the parameters:
UPDATE users SET last_login = NOW() WHERE id = $1
Extracting this parameter and routing updates thus works the same way as SELECTs and aren't worth discussing further. For queries that update multiple rows, the simplest solution is to treat it as a cross-shard query and send it everywhere. We could spend time optimizing this, assuming a large number of shards and a small number of targeted rows, but more often than not, this type of query will touch all shards anyway.
INSERTs are more interesting. The base case, i.e., inserting only one row, is simple:
INSERT INTO users (id, email, last_login) VALUES ($1, $2, $3)
Knowing that parameter $1 corresponds to the id column, we can hash it and route the query to the corresponding shard. For queries that insert multiple rows however, the proxy has to perform some extra steps.
For each tuple in the INSERT statement, the proxy needs to:
- Hash the sharding column parameter
- Place tuples with identical shards into their own "boxes"
For each "box", the proxy needs to:
- Generate a new INSERT query
- Send it to the corresponding shard separately
Consistency
When writing to multiple databases, it's possible that some writes succeed while others fail. This property is inherent to distributed systems5, but we can minimize its impact.
PostgreSQL supports two-phase commit6. This allows us to use prepared transactions when writing changes to multiple shards. Support for this is in the works, and more on this will be shared in a separate article.
Additionally, our proxy can implement its own Write-Ahead Log. Keeping track of transactions that succeed while replaying those that fail will improve the overall reliability of our system. The proxy will need to run on a system with storage backups and replication, like a RAID or a NAS (AWS EBS, for example).
Next steps
This article is part of a series on sharding Postgres. The project is built in the open with all code available under an open source license.
Next articles will discuss support for sorting, aggregation, and other topics like cross-shard transactions and distributed migrations.