Data Modelling with Cassandra
By Sam Overton
7 Nov 2011
Category:
Technical Articles
Denormalisation is essential at scale, and Cassandra's read/write tradeoff is well-adapted for it. In this article we work through an example use case showing how this works in practice.
I will consider an asymmetric messaging application and go through the steps explaining how such a system might be implemented in Cassandra, and the properties of Cassandra that we can exploit in order to design a performant system.
Asymmetric messaging is a messaging model popularised most famously by Twitter's follower model, but could represent any publish-subscribe system whereby messages are broadcast from one user to one or more other users, without the sender having to explicitly specify the recipients.
Relational Model
In our model we have three entities: Users, Edges and Messages. A standard normalised relational model might look something like this:
User(id) <-1--*-> Edge(follower_id, followee_id) <-1--*-> Message(id, user_id, msg)
An edge is a directional link representing the asymmetric (follower) relationship between two users.
The operations we will allow are subscribe (add an edge), broadcast (add a message), and read-timeline (list all messages from users that a given user follows). In our relational model, these operations would be implemented as follows:
subscribe
User A subscribes to user B:
INSERT INTO Edge VALUES (A, B)
broadcast
User B broadcasts message M:
INSERT INTO Message(user_id, msg) VALUES (B, M)
read-timeline
User A reads all messages from users to whom he is subscribed:
SELECT userid, msg FROM Message m, Edge e WHERE e.follower_id = 'A' AND m.user_id = e.followee_id
We can see that with this schema, we must perform a JOIN on Edge and Message tables in order to retrieve the timeline for a given user.
Performing a table join is an expensive operation, as it requires some amount of random disk IO. Assuming we have indexes on both Edge.follower_id and Message.user_id, the number of users followed by user A is ''F'', and the average number of messages per followee is ''M'', we will need to do * ''O(1)'' seeks to look up A in the Edge index and obtain users he follows * ''O(F)'' seeks to look up the messages from each followee in the Messages index * ''O(FM)'' seeks to retrieve the messages. This means that to populate a timeline we will be doing approximately one seek per message on average.
A disk doesn't do many IO operations per second (IOPS) and the ballpark figure we work with is 100 IOPS per disk. This means that for a modest timeline of 100 messages, we can only read 1 user's timeline per second per disk! On the other hand, disk bandwidth is somewhere of the order of 100MBps. If a message is somewhere in the region of 140 bytes (to choose a number completely at random), we can calculate that the upper bound for reading messages should be 750,000 messages (or 7,500 user timelines) per second per disk.
Denormalisation
By denormalising the schema we can make much better use of our disk bandwidth by making our reads sequential. Cassandra allows us to have extremely wide rows (up to 2 billion columns) and we can take advantage of this to design a denormalised schema that gives us much better read performance for the 'read-timeline' operation.
Cassandra Model
Column Family
Row key
Column key
Edges: {
follower_id: {
user1: 1,
user2: 1,
...
}
}
UserMessages: {
user1: {
timestamp1: message1,
timestamp2: message2,
...
}
}
In the denormalised model, the Users and Messages entities have been joined together to form the UserMessages column-family where each row-key is a user ID, each column name in that row is a time-stamp, and each column value is a message. This model allows us to query the entire timeline for a user with ''O(1)'' disk seeks to find the user's row, followed by sequential reads of the timeline.
The cost of denormalisation is duplication of data, and in this case we have duplicated each message by copying it to the timeline of every user who follows the author of that message. This means we incur a write cost when broadcasting a message, since we must insert the same message multiple times. Luckily for us, Cassandra is optimised for high write throughput (writes perform only sequential I/O) and it is this performance profile of Cassandra that allows us to trade some write speed for increased read throughput.
Another slightly less obvious cost of denormalisation is that we must now read the Edges column family when performing a write, since we must find out which users the message author is followed by. In practice it should be possible to fit this column family in cache even for a large number of users, so this look-up will only rarely hit disk.
Conclusion
We have seen how to exploit some of the properties of Cassandra - near-infinite row width and a heavily write optimised performance profile - to perform denormalisation and greatly improve our read throughput.