Monday 28 March 2011

MySQL Cluster online scaling

Most people looking at a diagram showing the Cluster architecture soon want to know if the system can scale online. Api nodes such as MySQLD processes can be added online, and the storage capacity of existing data nodes can be increased online, but it was not always possible to add new data nodes to the cluster without an initial system restart requiring a backup and restore.

An online add node and data repartitioning feature was finally implemented in MySQL Cluster 7.0. It's not clear how often users actually do scale their Clusters online, but it certainly is a cool thing to be able to do.

There are two parts to the feature :
  1. Online add an empty data node to an existing cluster
  2. Online rebalance existing data across the existing and new data nodes

Adding an empty data node to a cluster sounds trivial, but is actually fairly complex given the cluster's distributed configuration, ring heartbeating etc. Stewart Smith did some preparatory work on this a few years ago, and this was revisited for the feature.

Rebalancing existing table data to make use of the new storage capacity is more challenging. How does this work? More importantly, how does this work online, while transactions are starting and committing, and queries are running?

As an aside, the definition of 'Online' used here is that multiple distributed clients continue to start and commit transactions reading and writing data to the cluster. Some things, like concurrent DDL may be blocked during these operations.

To understand the online data rebalancing mechanism, we need to go into more detail on the native data distribution mechanisms introduced in the last post.

Ndb native partitioning variants

There are currently three variants of Ndb's native partitioning function :
  • Linear Key
  • Key
  • HashMap
Linear Key is used where the table is created with PARTITION BY LINEAR KEY. Key is used where the table is created with PARTITION BY KEY *prior to Cluster 7.0. HashMap is used where the table is created with PARTITION BY KEY in releases starting at Cluster 7.0.

These partitioning functions can be decomposed into two functions, where the first, an MD5 hash of the partition/distribution key, is fixed. MD5 is no longer considered secure, but for the purpose of 'balancing' rows across fragments it is more than adequate.

The variant part of these algorithms is how the MD5 hash of the key is mapped to a partition/fragment number. This has important implications for how repartitioning works.

fragment_num( dist_key_cols ) = mapping_fn( md5 ( dist_key_cols ) )

Linear Key Mapping Function


The Linear Key scheme is part of an old design for online repartitioning. The old design was intended to minimise the amount of data transfer required when repartitioning tables by ensuring that an existing partition could be cleanly 'split in two' so that half of its data could be migrated. This is a good policy, but Linear Key had the downside of requiring a power-of-2 number of partitions.

Where a non power-of-2 number of data nodes existed, this causes problems. Additionally, if repartitioning with this scheme had ever been implemented, it would have required fragments to be 'split in two' to expand the system. The power-of-2 requirement of this scheme was listed as a Cluster limitation in the early days, though this has not been a problem since the non-linear Key scheme became default.

Key Mapping Function

The Key scheme is simpler than Linear Key, and simply divides the hash result modulo the number of fragments of the table to determine which fragment a row resides in. This removes the power-of-2 restriction that Linear Key required, so rows can be evenly balanced across any number of nodes. However, it is not amenable to online reorganisation, as changing the number of table fragments changes the modulo division value, which can result in most of the resulting partition values changing. This means that a reorganisation using this scheme could result in excessive data transfer.

e.g.

mapping_fn_key(x) = x % num_frags

md5(dist_key_cols) = 23

num_frags = 4, 23 % 4 = 3
num_frags = 6, 23 % 6 = 5
num_frags = 8, 23 % 8 = 7


As the system expands, too much data is being moved. This is expensive, slow, requires extra storage etc.

HashMap Mapping Function

In MySQL Cluster 7.0, the HashMap distribution scheme was added and became the default. It is used when PARTITION BY KEY() is explicitly given, or implied if no partitioning specification is given.
The HashMap scheme uses a mapping table from md5 hash result to fragment number. The hash result used is 32-bits, which would require a large lookup table, so we first shrink it down to something more manageable (n) by modulo division by n. n = 240 is the default number, though the implementation supports any modulo value.
The resulting number is then used to lookup a table to get the fragment id which will store the row.

e.g.

mapping_fn_hashmap(x) = lookup_tab [x % mod_val ]

fragment_number = lookup [ md5( dist_key_cols ) % mod_val ]

fragment_number = lookup [ md5( dist_key_cols ) % 240 ]

The lookup table adds another layer of indirection between the hash result (which is fixed for any given key), and the fragment number, whose range can increase over time.

Assuming mod_val is 240, and we start with 4 fragments, then the 240 entry lookup table will have 60 entries with 0, 60 with 1, 60 with 2 and 60 with 3. As an aside, these will be sequenced as 0,1,2,3,0,1,2,3,0.... so that the actual default distribution will be exactly the same as with the KEY scheme.

If we want to spread the table data over 6 fragments, then we can change the table to use a new hashmap lookup table, where 2/6 of the existing 240 values are changed to refer to the new fragment numbers. The other 4/6 are unchanged. Expanding again to 8 fragments, we can change to another new hashmap, where 2/8 of the existing 240 values are changed, and the other 6/8 are unchanged. In each case, the minimum amount of data is affected to maintain balance.

Changing the hashmap is easy, the real work is in moving the data while it's being operated upon, but what the hashmap gives is a way to move only the minimum amount of data required when adding nodes. Only the data that has to move is moved, the rest stays where it is. The data distribution randomisation given by the MD5 function is unaffected, so system balance is maintained.

Choice of HashMap mod_val

As the default mod_val of 240 is significantly higher than common fragment counts, and because it factors well, most configurations will remain well balanced, despite being reorganised.

e.g. Assuming 2-node increments, a minimum with NoOfReplicas=2
240 / 2 = 120
240 / 4 = 60
240 / 6 = 40
240 / 8 = 30
240 / 10 = 24
240 / 12 = 20
240 / 14 = 17.1
240 / 16 = 15
240 / 18 = 13.3
240 / 20 = 12
240 / 22 = 10.9
240 / 24 = 10

240 factors cleanly into whole numbers (of lookup table entries) meaning that data should be well balanced across the table fragments when the data is repartitioned. Where there is not an integer result (e.g. 240/14), we would have most partitions with 17 lookup entries, and two with 18 lookup entries. The imbalance between them would be (18/17)-1 = 6%. If this were problematic, then a different mod_val could be used. A higher mod_val gives smaller partition imbalances, but requires
more memory to store. If necessary, the lookup table could be expanded in size by any integer factor (e.g. 2,3,4..) online to make it large enough to factor better for some desired data node count.


Moving rows online

The HashMap gives fine grained control of data placement, but how does the reorg happen online?

Table Reorganisation is similar Node recovery in some ways, in that the data is copied via fragment scans of the existing fragments, while at the same time, synchronous triggers are used to forward changes made to the existing fragment rows. The triggers and scans only copy data for rows which are to be moved,

e.g. where

new_hashmap_lookup[ md5( dist_key_cols ) % 240 ]
!=
old_hashmap_lookup[ md5( dist_key_cols ) % 240 ]


Other rows are left where they are.

With this mechanism, the new fragments are populated with rows from the existing fragments while read and write transactions continue. Once the fragment scans complete, the new fragments continue to be maintained by the synchronous triggers.

A future GCP boundary is chosed to be the 'cutover' point, and at this GCI, the new HashMap starts getting used for new transaction processing, and the new fragments start being used. Triggers are setup to propagate changes from the new fragments back to the pre-existing fragments, so that any older transactions using the old hashmap definition will see consistent data changes.

Once all transactions using rows from the pre-existing fragments have committed, the synchronous triggers are dropped, and the pre-existing fragments are scanned again, deleting the moved rows. Once this step completes, the reorganisation
is done.

Primary and Unique key operations in Ndb are short lived, and at Hashmap cutover, it doesn't take long until all old operations have committed. However, ordered index and table scans are slower and may not complete for some time. Both old and new row copies are maintained until all scans started using the old distribution have completed, so that ongoing transactions need not be aborted as part of the online reorg.

At the same time as the hashmaps are cutover at a GCI boundary, any NdbApi event subscribers listening to data change events on the table, for example attached MySQLDs recording Binlogs, start receiving events for the moved rows from the new fragments, and stop receiving them from the old fragments.

Transient storage use

When adding data nodes, the reorganisation uses no extra storage space on existing data nodes. On new data nodes, only the space used for the moved data is used. After the reorganisation completes, the space formerly used on pre-existing data nodes can be used for new data, so the system capacity is increased.

Transactional behaviour

Table reorganisation can take some time when there is a lot of data to move. A node or cluster failure during a reorganisation could leave the system in a transient state which would be difficult to recover from. One of the internal infrastructure changes in Cluster 7.0 was making all DDL operations transactional. This means
that they are atomic w.r.t. failures, including node and system failures. This applies to CREATE/DROP/ALTER of TABLE/INDEX/TABLESPACE etc.
This also applies to table reorganisation as it is a form of ALTER TABLE. If the reorganisation fails, or a node fails, or the cluster fails at some point during the reorganisation, then as part of system recovery, the reorganisation will be rolled back, or completed, if it had committed at the time of failure.

So that covers online table reorganisation. I've been meaning to write about it for some time, though somehow these entries always seem to be more like adverts than technical info.

Saturday 26 March 2011

Data distribution in MySQL Cluster

MySQL Cluster distributes rows amongst the data nodes in a cluster, and also provides data replication. How does this work? What are the trade offs?

Table fragments

Tables are 'horizontally fragmented' into table fragments each containing a disjoint subset of the rows of the table. The union of rows in all table fragments is the set of rows in the table. Rows are always identified by their primary key. Tables with no primary key are given a hidden primary key by MySQLD.

By default, one table fragment is created for each data node in the cluster at the time the table is created.

Node groups and Fragment replicas

The data nodes in a cluster are logically divided into Node groups. The size of each Node group is controlled by the NoOfReplicas parameter. All data nodes in a Node group store the same data. In other words, where the NoOfReplicas parameter is two or greater, each table fragment has a number of replicas, stored on multiple separate data nodes in the same nodegroup for availability.

One replica of each fragment is considered primary, and the other(s) are considered backup replicas. Normally, each node contains a mix of primary and backup fragments for every table, which encourages system balance.

Which replica to use?

The primary fragment replica is used to serialise locking between transactions concurrently accessing the same row. Write operations update all fragment replicas synchronously, ensuring no committed data loss on node failure. Read operations normally access the primary fragment replica, ensuring consistency. Reads with a special lock mode can access the backup fragment replicas.

Primary key read protocol

When an NdbApi client (for example a MySQLD process) wants to read a row by primary key, it sends a read request to a data node acting as a Transaction Coordinator (TC).
The TC node will determine which fragment the row would be stored in from the primary key, decide which replica to access (usually the primary), and send a read request to the data node containing that fragment replica. The data node containing the fragment replica then sends the row's data (if present) directly back to the requesting NdbApi client, and also sends a read acknowledgement or failure notification back to the TC node, which also propagates it back to the NdbApi client.

Minimising inter data node hops

The 'critical path' for this protocol in terms of potential inter-data-node hops is four hops :

Client -> TC -> Fragment -> TC -> Client

To minimise remote client experienced latency, ideally two inter-node hops can be avoided by having the TC node and the Fragment replica(s) on the same node. This requires controlling the choice of node for TC based on the primary key of the data which will be read. Where a transaction only reads rows stored on the same node as its TC, this can improve latency and system efficiency.

Distribution awareness

From NdbApi, users can specify a table and key when starting a transaction. The transaction will then choose a TC data node based on where the corresponding row's primary fragment replica is located in the system. This mechanism is sometimes referred to as 'transaction hinting'.

The Ndb handler in MySQLD generally waits for the first primary key lookup in a user session before starting an NdbApi transaction, so that it can choose a TC node based on this. This is a best-effort attempt at having the data node acting as TC colocated with the accessed data. This feature is usually referred to as 'Distribution Awareness'.

Write operations also benefit from distribution awareness, but not to the same extent in systems with NoOfReplicas > 1. Write operations must update all fragment replicas, which must be stored on different nodes, in the same nodegroup, so for NoOfReplicas > 1, distribution awareness avoids inter-node-group communication, and some intra-node-group communication, but some inter-data-node communication is always required. In a system with good data partitioning and distribution awareness, most read transactions will access only one data node, and write transactions will result in messaging between the data nodes of a single node group. Messaging between node groups will be minimal.

Distribution keys

By default, the whole of a table's primary key is used to determine which fragment replica will store a row. However, any subset of the columns in the primary key can be used. The key columns used to determine the row distribution are called the 'distribution key'.

Where a table's primary key contains only one column, the distribution key must be the full primary key. Where the primary key has more than one column, the distribution key can be different to (a subset of) the primary key.

From MySQLD, a distribution key can be set using the normal PARTITION BY KEY() syntax. The effect of using a distribution key which is a subset of the primary key is that rows with different primary key values, but the same distribution key values are guaranteed to be stored in the same table fragment.

For example, if we create a table :

CREATE TABLE user_accounts (user_id BIGINT,
account_type VARCHAR(255),
username VARCHAR(60),
state INT,
PRIMARY KEY (user_id, account_type))
engine = ndb partition by key (user_id);


Then insert some rows :


INSERT INTO user_accounts VALUES (22, "Twitter", "Bader", 2),
(22, "Facebook", "Bd77", 2),
(22, "Flickr", "BadB", 3),
(23, "Facebook", "JJ", 2);


Then we know that all rows with the same value(s) for the distribution key (user_id), will be stored on the same fragment. If we know that individual transactions are likely to access rows with the same distribution key value then this will increase the effectiveness of distribution awareness. Many schemas are 'partitionable' like this, though not all.

Note that partitioning is a performance hint in Ndb - correctness is not affected in any way, and transactions can always span table fragments on the same or different data nodes. This allows applications to take advantage of the performance advantages of distribution awareness without requiring that all transactions affect only one node etc as required by simpler 'sharding' mechanisms.

Correlated distribution keys across tables

A further guarantee from Ndb is that two tables with the same number of fragments, and the same number and type of distribution keys will have rows distributed in the same way.

For example, if we add another table :


CREATE TABLE user_prefs (user_id BIGINT,
type VARCHAR(60),
value VARCHAR(255),
PRIMARY KEY (user_id, type))
engine = ndb partition by key (user_id);


Then insert some rows :


INSERT INTO user_prefs VALUES (22, "Coffee", "Milk + 6 sugars"),
(22, "Eggs", "Over easy"),
(23, "Custard", "With skin");


Then we know that the rows with the same user_id in the user_prefs and user_accounts tables will be stored on the same data node. Again, this helps with distribution awareness. In this example, we are ensuring that rows related to a single user, as identified by a common user_id, will be located on one data node, maximising system efficiency, and minimising latency.

Ordered index scan pruning

MySQL Cluster supports arbitrary ordered indexes. Ordered indexes are defined on one or more columns and support range scan operations. Range scans are defined by supplying optional lower and upper bounds. All rows between these bounds are returned.

Each Ndb ordered index is implemented as a number of in memory tree structures (index fragments), distributed with the fragments of the indexed table. Each index fragment contains the index entries for the local table fragment. Having ordered indexes local to the table fragments makes index maintenance more efficient, but means that there may not be much locality to exploit when scanning as rows in a range may be spread across all index fragments of an index.

The only case where an ordered index scan does not require to scan all index fragments is where it is known that all rows in the range will be found in one table fragment.
This is the case where both :
  1. The ordered index has all of the table's distribution keys as a prefix
  2. The range is contained within one value of the table's distribution keys

NdbApi detects this case when a range scan is defined, and 'prunes' the scan to one index fragment (and therefore one data node). For all other cases, all index fragments must be scanned.

Continuing the example above, assuming an ordered index on the primary key, the following ordered index scans can be pruned :


SELECT * FROM user_accounts WHERE user_id = 22;
SELECT * FROM user_accounts WHERE user_id = 22 AND account_type LIKE 'F%';


However, the following ordered index scans cannot be pruned, as matching rows are not guaranteed to be stored in one table fragment :


SELECT * FROM user_accounts WHERE account_type = "Facebook";
SELECT * FROM user_accounts WHERE user_id > 20 AND user_id < 30;


MySQLD partitioning variants and manually controlling distribution

Since MySQL 5.1, table partitioning has been supported. Tables can be partitioned based on functions of the distribution keys such as :

  • KEY
  • LINEAR KEY
  • HASH
  • RANGE
  • LIST

For engines other than Ndb, partitioning is implemented in the Server, with each partition implemented as a separate table in the Storage engine. Ndb implements these partition functions natively, using them to control data distribution across table fragments in a single table.

From Ndb's point of view, KEY and LINEAR KEY are native partitioning functions. Ndb knows how to determine which table fragment to use for a row from a table's distribution key, based on an MD5 hash of the distribution key.

HASH, RANGE and LIST are not natively supported by Ndb. When accessing tables defined using these functions, MySQLD must supply information to NdbApi to indicate which fragments to access. For example before primary key insert, update, delete and read operations, the table fragment to perform the operation on must be supplied. From MySQLD, the partitioning layer supplies this information.

Any NdbApi application can use the same mechanisms to manually control data distribution across table fragments. At the NdbApi level this is referred to as 'User Defined' partitioning. This feature is rarely used. One downside of using User Defined partitioning is that online data redistribution is not supported. I'll discuss Online data redistribution in a future post here.


Edited on 12/10/11 to fix formatting imbalance