MySQL Sharding at Quora

栏目: IT技术 · 发布时间: 4年前

内容简介:In this blog post, we discuss how Quora was able to scale our usage of MySQL to meet the growing requirements of our content. We’ll focus especially on the challenge of sharding the data stored in MySQL at scale.

Team: Nagavamsi (Vamsi) Ponnekanti, Lingduo Kong, Hwan Seung Yeo (manager)

In this blog post, we discuss how Quora was able to scale our usage of MySQL to meet the growing requirements of our content. We’ll focus especially on the challenge of sharding the data stored in MySQL at scale.

MySQL at Quora

A simplified architecture diagram shown below

MySQL Sharding at Quora

We use MySQL to store critical data such as questions, answers, upvotes, and comments. Data size stored in MySQL is of the order of tens of TB without counting replicas. Our queries per second are in the order of hundreds of thousands. As an aside, we also store a lot of other data in HBase. If MySQL is slow or unresponsive, the Quora site is severely impacted.

Over the years, MySQL usage at Quora has grown in multiple dimensions including the number of tables, table sizes, read QPS (queries per second), write QPS, etc. In order to improve performance, we implemented caching using Memcache as well as Redis. The support in Redis for data structures such as lists is a big reason for using both the caching systems. While the addition of caching helped with our increasing volume of reads, the growth of data size and write QPS led us to look into sharding MySQL.

Vertical Sharding

A couple of years ago, our infrastructure team implemented the ability to move a table from one MySQL host to another. This allowed us to have multiple masters with different tables on different masters, which we refer to as “ vertical sharding ”. Each one of these masters, along with its slaves, is called as a “ partition” at Quora.

The mapping from a partition to the list of tables in that partition is stored in Zookeeper (ZK). Likewise, the mapping from a partition to the master/slave physical hosts is also stored in Zookeeper.

In order to scale up in a partitioned environment, we would create a new partition, i.e., start a new MySQL server (with slaves), and move some large or high-traffic tables over to it. To move table(s) from one partition to another, we designed the following process:

1. Use mysqldump tool to dump the table in a single transaction along with the binlog position (those unfamiliar with MySQL binlogs can refer to 20.1 Binary Log Overview )

2. Restore the dump on the destination master

3. Replay binlogs just for this table on the destination master

4. When the replay is almost caught up, we do a “cutover” that involves two manual steps

  1. Rename the table on the source partition to stop traffic to it. Table T is renamed as T__bak. Soon after the rename, the script that is replaying binlogs sees the rename table operation in binlog, and exits, as there would be no more updates to the table to replay.
  2. We run a cmd to update the location of the table in ZK from source partition to destination partition.

Note that this approach favors consistency over availability, as there is a small window of table unavailability (“table not found” errors) between steps 4 (a) and 4 (b) above.

An advantage of this approach is that if something goes wrong we have the ability to undo. Undo basically involves replaying any changes that may have happened to the table in the reverse direction (i.e. from its new location back to its old location) and then changing the location of the table in ZK back to the original location.

Vertical sharding enabled us to scale out, rather than always needing to scale up as our data size and QPS increased. There are some disadvantages to this approach. While the table movement process outlined above works well for small tables, for large tables it can result in replication lag for the slaves of the destination partition. The table movement script can also move multiple tables together, but if the total size of those tables is large we are likely to see the replication lag problem.

If two tables need to be joined in MySQL they needed to live in same partition. Joins inside MySQL were strongly discouraged in the codebase so that we would have more freedom in choosing which tables to move for scale out. The result is that we do very few joins in MySQL. Vertical sharding also meant giving up some of our transactional functionality.

Need for Horizontal Sharding

Vertical sharding was a good first step to scale our usage of MySQL to meet the growing requirements of our content. However, individual tables can still grow very large with vertical sharding, so we also considered horizontal sharding as a complement. Horizontal sharding involves splitting a logical table into multiple physical tables, which are then referred to as shards of the table. There are a number of reasons why large tables can be problematic and thus horizontal sharding is an appealing addition:

1. Unknown Risks : Most companies that need to scale their MySQL usage utilize horizontal sharding. Not following this common practice means we’re more likely to encounter issues with large tables that other companies generally don’t face.

2. Online schema change tool (refer to pt-online-schema-change for details) encounters these issues with huge tables :

  1. Needs 2X the space
  2. A schema change could require several hours (sometimes more than a day) if we want to avoid significant load on the online system. Occasionally it will abort due to load spikes and need to be restarted from the beginning.
  3. Though it is an “online” schema change tool, at the start it gets a short duration X lock on the entire table in order to create triggers. We have seen the X lock sometimes impact our production traffic. If the table has 10 shards, at any given time only ~10% of the table would be locked.
  4. Triggers are created on the entire table for the entire duration of a schema change. This can cause significant overhead and/or performance problems. Imagine a table T with X modifications per second (i.e. insert or delete or update) that needs H hours for the online schema change. Triggers would cause an additional X modifications per second for H hours. On the other hand, if the table has 10 shards then at any given time we need to have triggers on only one shard.

3. Since large tables are not good candidates for the table move operation, our opportunities for vertical sharding are restricted.

4. Other unexpected issues : For example, MySQL sometimes chooses the wrong index when reading or writing. Choosing the wrong index on a 1TB table can wreak much more havoc than choosing the wrong index on a 100GB table.

We believe that Quora still has a lot of growth ahead of us. Over time, tables are likely to get much larger and so we decided to start investing in horizontal sharding.

Horizontal Sharding

In this section, we will discuss the key decisions that were made when implementing horizontal sharding as well as the design of key components. Topics will include:

  1. The build vs buy decision
  2. Whether to shard at the logical database level or the table level
  3. Whether to do range-based sharding or hash-based sharding
  4. Managing the metadata of shards
  5. An API to access sharded tables
  6. Choosing the sharding column for a table
  7. How many shards to create
  8. Migrating from an unsharded table to a sharded table
  9. What to do if a shard grows very large

Build vs Buy : Do it on our own or use 3rd party solutions?

One of our first decisions was whether to do sharding on our own or use a 3rd party solution. This was not an easy decision. We chose to do it on our own for the following reasons:

1. There tends to be a high integration cost to 3rd party software when you factor in the cost of developing expertise in it, making code changes in it, etc. Though we have several hundred tables, less than 10 seemed large enough to need sharding. If we had ~100 or more tables to shard then the cost of integration with 3rd party software may have been easier to justify.

2. We already did vertical sharding and can reuse much of that infrastructure (support for different tables on different hosts) and code (replaying changes from binlogs) for horizontal sharding.

3. We already moved away from using SQL text in our database APIs, as using SQL text meant it was easy for developers to write queries that are vulnerable to SQL injection attacks. The security team had changed our database APIs to take in explicit arguments for various parts of a SQL statement, such as table name, select clause list, expression for where clause, order by column list, etc. The database APIs would then build the SQL text from those arguments and send it to the database. This makes it easier to enhance the APIs to work for sharded tables, as we shall see later. We did not have to do the work of parsing SQL text and modifying it to work for the sharded table.

4. Joins in SQL make horizontal sharding more complex. As mentioned above we have very few joins in our codebase. We plan to gradually move those joins into the application as it becomes necessary.

5. We have plans to move to graph-based APIs gradually (along the lines of Facebook TAO described in https://www.usenix.org/system/fi... or Pinterest Zen talked about in Zen: Pinterest's Graph Storage Service - @Scale 2014 - Data ). Using 3rd party software for sharding and also using a graph storage service would mean two “middleware” layers between web and MySQL which could potentially cause some hits on latency.

Shard at the logical database level or the table level

Suppose a MySQL instance has a logical database “quoradb” which has 5 tables A, B, C, D and E.

MySQL Sharding at Quora

We considered whether to shard at the logical database level or at the table level. Sharding at the logical database level versus the table level is illustrated in the following picture for 2 shards:

MySQL Sharding at Quora
  1. If we were to shard at the logical database level the result would be multiple logical databases, such as quoradb1, quoradb2, quoradb3, etc., on the same or different MySQL instances. Each database would have a part of the 5 tables.
  2. If we were to shard at the table level then we could choose to only shard a few of the large tables. Note that shards of the table could be on the same or different MySQL instances. Each MySQL instance would have a logical database “quoradb”, but within “quoradb” different instances would have different tables or different shards of the same table. This approach is a natural extension of vertical sharding that already allows different tables to be on different MySQL instances. The main difference is some physical tables could now be shards.

We'd prefer sharding at the logical database level as this would have been the simpler approach. However, the Quora codebase is very large and makes extensive use of secondary indexes since the code for accessing tables was written assuming there is no horizontal sharding. All the secondary index data is indexed only within a shard. Many queries using secondary indexes may have to query ALL the shards of the table unless we were to convert those secondary indexes into separate tables and shard them differently than the base table. This would require a lot of effort to make sharding at the logical database level feasible. If we shard at the table level, we're able to be selective about which tables we shard. Overall, we believe that sharding at the table level would be significantly less effort.

Range-based sharding or hash-based sharding

When a logical table is split into multiple physical tables, an important question is how do we split the data in the original table among the new shards. Two popular choices are range-based sharding and hash-based sharding. At Quora, we make frequent use of range queries and so we started there since hash-based sharding would not perform well with range queries on a sharding key. Note that even in HBase the regions of a table are range-based shards.

The next question is where do we track which shards have what range of sharding keys. We decided to encode the range information into the shard name itself. If a shard of a table T holds values >= x and < y, we name the shard as T_shard_x_y. The last shard may be named T_shard_x_MAX if there is no upper bound. Likewise the first shard may be named T_shard_MIN_y if there is no lower bound. (NOTE: If a sharding column is binary, varbinary, char, or a varchar data type then x or y could have characters that are not desirable inside a table name. To avoid this we convert x or y to hex strings, and use that in the shard name.)

Managing metadata of shards

Another important decision is where to keep the metadata associated with each shard. As noted earlier, we use Zookeeper to keep the metadata for vertical sharding (i.e., which table lives in which partition) so we store the horizontal shard metadata there as well:

  • A new ZK node “sharded_tables” has a list of all sharded tables as its children.
  • The ZK node for each sharded table has a list of all its shards as children.

API to query sharded tables

As noted earlier, our database APIs take in explicit arguments for various parts of a SQL statement such as the table name, select clause list, expression for where clause, order by column list, etc. The database APIs then build the SQL text from those arguments and send the query to the database.

Here is a simple example of how the our database API looks for accessing unsharded tables:

rows = select(
            columns=["topic_id", "creation_time"],
            table="user_topic2",
            conditions=[("network_id", "=", qclient.network.current_nid()), ("user", "=", user_id)],
            order_by=[("creation_time", False)],
            limit=1,
        )

The connection between a SQL statement and the above API is straightforward. It generates the following SQL query, where “?” indicates values being queried for:

SELECT topic_id, creation_time
FROM user_topic2
WHERE network_id = ? AND user = ?
ORDER BY creation_time DESC
LIMIT 1

Likewise, we have insert(), delete(), update(), replace() and count() APIs. To handle sharded tables, we added wrappers for each of those of the form sharded_* eg., sharded_select(). Sharded_select() is more complex and has 2 additional args compared to the select() API.

  1. Sharding_col : string
    1. to sanity check that table is sharded on the expected column
  2. Sharding_key : a union that can be used to express one of :
    1. None => query ALL shards
    2. Single sharding key => query only the shard that contains it
    3. List of sharding keys => query all shards containing these keys
    4. Range of sharding keys (i.e., start and end) => query shards containing keys in that range

Note that the caller is responsible for figuring out whether their query is looking for one sharding key, a list of sharding keys, or a range of sharding keys as well as providing that information to the query. There are some limitations at the moment, primarily:

  • Joins, group by, having, and offset are not supported
  • Order by is supported only on the sharding column

Here is an example of using new API:

rows = sharded_select(
            columns=["topic_id", "creation_time"],
            table="user_topic2",
            sharding_col="user",
            sharding_key=user_id,
            conditions=[("network_id", "=", network_id), ("user", "=", user_id)],
            order_by=[("creation_time", False)],
        limit=1,
        )

Note: Pass ALL conditions as in select() API,  including ("user", "=", user_id).

Choosing the sharding column for a table

As you may know, when a table T is sharded by column x one consequence is that the queries which lookup using a different column y will need to examine ALL shards. If the QPS of those queries is high OR if they are sensitive to latency, then having a large number of shards could cause performance problems that would not be present if the table was unsharded. So, when choosing a sharding column we need to consider the QPS and latency sensitivity of the queries on the table.

A potential optimization is to create a cross-shard index. The cross-shard index is actually a separate MySQL table that maps from a non-sharding column y to a sharding column x. It can be used to find the value(s) of x given the value of y. If the cross-shard index table itself is large then it can be sharded on column y.

If there is only one value of x for a given value for y, then with the help of the cross-shard index a query that is looking for rows of T with y = Y1 would need to examine only one shard of T. Note that there is still an overhead of first looking up Y1 in the cross-shard index in order to get the corresponding value of x, but the overhead is just one lookup.

On the other hand, if there are N values of x for a given value for y, then the cross-shard index would return N values of x, all of which would need to be looked up in T. These values may be spread across the N shards. As N gets large, the cross-shard index becomes less effective in reducing number of shards of T to examine.

The three large tables that we have sharded so far did not need a cross-shard index. However, we expect to need it in the future for sharding other tables. Since the cross-shard index is actually a separate table that could be on a different MySQL instance, it won’t be updated atomically. So we would need a way of detecting and fixing inconsistencies in the cross-shard index. Note that such inconsistencies could also happen with vertical sharding if there is a foreign key relationship between two tables that live in different MySQL instances.

How many shards to create

As mentioned above, queries that lookup using a non-sharding column may need to examine multiple shards. We decided against creating a large number of shards in order to reduce the penalty from examining multiple shards. The question of how many shards to create is closely related to shard size.

If the sharding column is an auto-increment column then most of the growth of the table will occur in the “last” shard. In such cases, we target a shard size of a few hundred GB for the other shards and a very small initial size for the last shard during the initial sharding of the table. If the sharding column is not an auto-increment column then all shards of the table are likely to grow over time. In this case we targeted a shard size of under 100GB for the shards during the initial sharding. In both of the above cases we analyzed the Hive dumps of each table in order to choose reasonable shard boundaries.

Migrating from an unsharded table to a sharded table

Next we’ll walk through the process of migrating from an unsharded table to a sharded one. We will set up the sharded table on a different partition than the unsharded table. Initially, all shards of the table would be put on the same partition though over time different shards of a table may get moved to different partitions.

These are the steps we follow during the migration:

  1. Data copy phase
    1. We read from the source table in batches and copy to the shards.
    2. We set up N threads for N shards. Each thread copies data to one shard. We can rate limit the copying to control for replication lag.
    3. Before starting the copy, we make a note of the current binlog position.
  2. Binlog replay phase
    1. Once the initial data copy is done, we can start replaying binlogs from the position noted above. The replay phase is very similar to what happens during the table move with the exception that we need to know which shard to replay on.
    2. We may encounter and need to ignore errors about duplicate data during inserts. This is because we may replay some operations that are already in the destination shards as we’re starting the replay from the binlog position at the time the copy started. Note that we use row-level binlogs rather than statement-level binlogs.
  3. Extended dark read testing phase
    1. We send shadow traffic to the sharded table in order to compare the results. We built a shadow_select() API that will call select() on the unsharded table and sharded_select() on the sharded table for the comparison.
    2. A small % of mismatches may occur due to lag in the replay.
  4. Extended dark write testing phase
    1. We stop the binlog replay and start doing dark writes for testing. We have similar APIs for writing, such as shadow_insert(), that call insert() on the unsharded tables and sharded_insert() for the sharded tables. If the sharding is on an auto-increment column, we can choose whether to use the same auto-increment value or an independently generated auto-increment value. In the latter case, data will diverge quickly between the unsharded and sharded tables.
    2. A small % of mismatches may occur due to the lack of a 2-phase commit between the unsharded and sharded tables. This is another minor source of potential data divergence.
  5. Fresh copy phase
    1. If we are happy with the dark traffic testing then we will restart with a fresh copy of the data. This is a repeat of the first step above. The fresh copy is done because the data may have diverged between the sharded and unsharded tables during the dark write testing due to the lack of a 2-phase commit.
  6. Binlog replay phase
    1. Repeat of the second step above.
  7. Short dark read testing phase
    1. This is similar to the third step above for a short time as a sanity check.
    2. We do not do a second dark write testing phase to prevent data divergence between the sharded and unsharded tables.
  8. Cutover phase
    1. This involves one manual step to rename the unsharded table. Once it is renamed, the reads and writes to the unsharded table will stop. After the rename, we also make a note of the current binlog position in the destination partition for the next phase. The replay script will see the renamed table in the binlog and know that all updates have been propagated to the sharded table. It then updates ZK to indicate that the sharded table is the source of truth.
  9. Reverse replay phase
    1. We start replaying binlogs to propagate all changes to all shards of the table back to the unsharded table.
    2. This is just in case we ever needed to switch back to the unsharded table.

Just as during the table move process, there is a small window of “table not found” errors during the cutover step.

What to do if a shard grows very large

Last but not the least is the question of what to do if a shard grows very large. It turns out that splitting a shard is simpler than the initial sharding of the table!

The idea is to leverage the exchange partition cmd of MySQL ( 19.3.3 Exchanging Partitions and Subpartitions with Tables ) which can be used to “instantly” swap a partition of a table with another unpartitioned table.

Suppose that there is a shard T_shard_x_y that holds values >= x and < y, and we want to split it at some mid-point z.

1. We first use the percona online schema change tool to convert the shard to a partitioned table with two partitions P0 and P1, such that P0 holds values < z and P1 holds values >= z. This is just like any other schema change for an unsharded table.

2. Rename T_shard_x_y and stop all reads/writes to it.

3. Create 2 empty shards, T_shard_x_z and T_shard_z_y, and use the exchange partition cmd to swap P0 with T_shard_x_z and P1 with T_shard_z_y.

4. Update the shard metadata in ZK to atomically remove T_shard_x_y while adding both T_shard_x_z and T_shard_z_y. This resumes traffic to the child shards.

A special case is when a table is sharded on an auto-increment column, as most of the growth occurs only in the last shard. In this case it is much simpler to split the last shard T_shard_x_MAX at the end rather than the middle. This will keep all existing data in T_shard_x_z and make a new empty T_shard_z_MAX shard. We can skip the expensive online schema change and there is no exchange partition involved.

Current Status and Future Work

We have currently sharded three large production tables. Because of the challenges related to sharding large tables we’re always looking for ways to simplify and streamline the process for other use cases. We are considering migrating some of our HBase use cases to MyRocks sharded tables. Our implementation of sharded tables on MySQL should mostly work for MyRocks.

We are also considering implementing a graph storage service similar to Facebook TAO or Pinterest Zen in order to hide the complexity of sharding from product teams. The new tables created for nodes and edges would be sharded tables and they would reuse a lot of ideas and code from what we described above.

Acknowledgements

We thank the Vitess team ( Vitess ) for their presentations, documents etc. on how Vitess does sharding.

We thank Siyuan Fu for implementing vertical sharding, and Jelle Zijlstra for changing database APIs to no longer take SQL text as input.

We thank Martin Michelsen , and Siyuan Fu for many discussions on the design of horizontal sharding.

We thank Jianzhou Zhao for assisting with exploring some enhancements to sharding.

We thank our head of Infrastructure, Michael Fu for supporting this project.

We thank other team members Chao Chen , Liangchen Zheng and Jianzhou Zhao for their comments and suggestions, especially relating to the future work.

Last but not the least, we thank Aaron Waterman for reviewing this post, making edits, and offering suggestions to make it more presentable.


以上所述就是小编给大家介绍的《MySQL Sharding at Quora》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Producter 让产品从0到1

Producter 让产品从0到1

周楷雯 / 人民邮电出版社 / 2016-12-25 / CNY 69.00

这是一本以App Store首页推荐的成功App为例阐述如何完成一款App产品的设计、开发和营销的书。在这本书之后,作者的《一炷香》和《字里行间》两款产品也接连被App Store首页推荐。 《Producter 让产品从0到1》从产品的设计、产品的实现、产品的迭代、产品的营销、产品的进阶等几个角度,全面讲解了产品设计的基本原则、设计的重要性、设计的感觉、实用的设计工具、简单的iOS开发、产......一起来看看 《Producter 让产品从0到1》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

URL 编码/解码
URL 编码/解码

URL 编码/解码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具