SoundCloud for Developers

Discover, connect and build

Backstage Blog RSS

You're browsing posts of the category Data

  • July 3rd, 2014 Data Real-time counts with Stitch By Emily Green

    We made Stitch to provide counts and time-series of counts in real-time.

    Stitch was initially developed to do the timelines and counts for our stats pages. This is where users can see which of their tracks were played and when.

    SoundCloud Stats Screenshot

    Stitch is a wrapper around a Cassandra database. It has a web application that provides read-access to the counts through an HTTP API. The counts are written to Cassandra in two distinct ways, and it's possible to use either or both of them:

    For real-time updates, Stitch has a processor application that handles a stream of events coming from a broker and increments the appropriate counts in Cassandra.
    The batch part is a MapReduce job running on Hadoop that reads event logs, calculates the overall totals, and bulk loads this into Cassandra.

    The problem

    The difficulty with real-time counts is that incrementing is a non-idempotent operation, which means that if you apply the same increment twice you get a different value to when you apply it once. If an incident affects our data pipeline, and the counts are wrong, we cannot fix by simply re-feeding the day's events through the processors; we would risk double counting.

    Our first solution

    Initially, Stitch only supported real-time updates and addressed this problem with a MapReduce job named The Restorator that performed the following actions:

    1. Calculated the expected totals
    2. Queried Cassandra to get the values it had for each counter
    3. Calculated the increments needed to apply to fix the counters
    4. Applied the increments

    Meanwhile, to stop the sand shifting under its feet, The Restorator needed to coordinate a locking system between itself and the real-time processors, so that the processors did not try to simultaneously apply increments to the same counter, resulting in a race-condition. It used ZooKeeper for this.

    As you can probably tell, this was quite complex, and it could take a long time to run. But despite this, it did indeed work.

    Our second solution

    We got a new use-case; a team wanted to run Stitch purely in batch. This is when we added the batch layer and took the opportunity to revisit the way Stitch was dealing with the non-idempotent increments problem. We evolved to a Lambda Architecture style approach, where we combine a fast real-time layer for a possibly inaccurate but immediate count, with a batch slow layer for an accurate but delayed count. The two sets of counts are kept separately and updated independently, possibly even living on different database clusters. It is up to the reading web application to return the right version when queried. At its naïvest, it returns the batch counts instead of the real-time counts whenever they exist.

    Stitch Diagram

    Thanks go to Kim Altintop and Omid Aladini who created Stitch, and John Glover who continues to work on it with me.

    If this sounds like the sort of thing you'd like to work on too, check out our jobs page.

  • July 5th, 2011 Data MySQL for Statistics – Old Faithful By Sean Treadway

    MySQL turns out to be a good Swiss Army Knife for persistence, if used wisely. Understanding disk access patterns driven by your storage engine is key. Choosing a read or write optimized disk layout will get you very far. We chose a read-optimized disk layout using InnoDB and MySQL for statistics.

    While our wheels were spinning trying to find out why our statistics storage patterns were causing MongoDB to thrash our disks, we started looking for an emergency alternative with the technology that we already had: MySQL+InnoDB.

    What we knew:

    • We need to persist a log entry for each play
    • The play events are coming in sequentially ordered by time

    What we needed:

    • Counts for 5 different dimensions of the plays: by referrer, listener, 3rd party application, and country, and the total count.
    • Counts for 3 different time ranges – all time, currently displayed period and previous period to calculate the percentage changed.
    • A time series of the totals for a single dimension over the displayed period

    What we had so far (don’t do this at home!):

    • A single log table called plays in our online database with the below schema
    • A secondary index on each dimension used by the statistics pages to build aggregates using the count aggregate function

    We posed the question, “what is the minimum we could do with our current InnoDB log table so that it can service our read load”.

    CREATE TABLE plays (
      id int(11) NOT NULL AUTO_INCREMENT,
      created_at datetime DEFAULT NULL,
      track_id int(11) DEFAULT NULL,
      user_id int(11) DEFAULT NULL,
      referer text COLLATE utf8_unicode_ci,
      country varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
      api_consumer_key varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
      PRIMARY KEY (id),
      KEY index_plays_on_track_id (track_id),
      KEY index_plays_on_user_id (user_id),
      KEY index_plays_on_created_at (created_at),
      KEY index_plays_on_referer (referer(50))
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

    The impending doom of letting this table continue to grow was also approaching fast. This table was ticking along at multiple billions of rows, consuming hundreds of gigabytes of data, had no clear shard key and the indexes added another 80% on top of the data and filling too much storage space on our OLTP databases.

    To answer the question “what can we do to serve stats within a second”, we should also find the root cause of “why aren’t we currently servicing our current read load?”

    Still bruised from getting whacked by the reality stick from our previous experiences, the first step was to trace the entire operation of a typical query from request to response, paying close attention to where the IO operations could be occurring and take the assumption that they will always occur. We no longer assumed that our dataset would reside in memory.

    Our typical query looked something like this:

    select track_id, country, count(*) as total
      from plays
     where track_id = 123
       and created_at between now() - interval 7 day and now()
     group by track_id, country

    This was performed over date intervals of week, last week and all time. For the other dimensions, we’d group by the additional column like country.

    The first thing that gets hit is the query planner. An explain on query for a 1 row table reveals this:

     select_type: SIMPLE
            table: plays
             type: ref
    possible_keys: index_plays_on_track_id,index_plays_on_created_at
              key: index_plays_on_track_id
          key_len: 5
              ref: const
             rows: 1
            Extra: Using where; Using temporary; Using filesort

    Ah, nice and simple, traverse the track_id index then load the row, fill out a temporary table with the aggregate fields track_id and country, then split up that temporary table into chunked buffers sorting each buffer, potentially writing it to disk, then perform a merge sort from all the temporary buffers.

    It looks so obvious now because we have spent the effort to read the MySQL source, read the documentation and walked through every step and the tuning variables that affect each step. We currently don’t have a DBA on staff to ask, so thankfully we could teach ourselves from the online manual and source code.

    Taking a little detour, it’s good to explain how we’re interpreting the Extra column one at a time. In reverse:

    Using filesort

    A filesort doesn’t necessarily mean it goes to files. This article by Sergey Petrunia gives a very good overview of what a filesort actually does, putting in clear terms what is suggested for optimizing MySQL order by clauses.

    We were obviously not hitting any group by optimizations with this query.

    Using temporary

    This is a byproduct of order by, group by or select distinct as described by how MySQL uses internal temporary tables. MySQL must use a temporary table to satisfy these kinds of query. If you have a lot of temporary tables created going to disk by monitoring the Created_tmp_disk_tables status variable, you should increase your tmp_table_size and max_heap_table_size variables to prevent “swapping” on your temporary partition.

    Using where

    This means that the condition of the query cannot be fulfilled solely from the index. The row data must be fetched so that the condition “created_at between ? and ?” can be tested. The number of rows in the rows column will indicate how many rows MySQL guesses it needs to pull out from the storage engine.

    How the rows are fetched is actually specific to the storage engine and this is where we learned our most important lesson about InnoDB: Everything is a BTree. The InnoDB table itself is a clustered index on the primary key with the leaves containing the row data. The secondary indexes (the ones you add to your table) are organized by indexed columns with their leaves containing the primary key of the row. References: [Xarb][xarb], MySQL Indexes.

    When you see “using where”, you are making 2 index lookups. One on the secondary index to find the primary key, and one on the clustered index (table) to find the row data.

    What this means for the plays table is a classic example of how easy it is to optimize for writes or reads but not both. The plays table is optimized for writes as plays come in, the surrogate ID gets incremented and ends up settling cosy next to the warm data on the right side of the BTree. A read comes in and performs a quick traversal down a secondary index. What it gets is a list of primary keys that can range from 0 to max(id). The lookup of those primary keys turn a sequential secondary index traversal into effectively multiple random lookups on the clustered index. Traverse primary key – load database page, traverse next primary key – load database page, seek to next primary key – load page, etc…

    In the worst-case, we incur 1 seek for every statistic. At about 10ms per seek… and some tracks with many millions of plays… we effectively try to load almost all pages from the table. Now we have the answer why we’re not able to perform.

    How to fix

    What we now know:

    • We have effectively random reads on the full plays table using secondary indexes

    What we needed:

    • Sequential reads by not reading the rows in our AUTO_INCREMENT primary key order

    Covering indexes

    This is one very good option to avoid the 2nd index lookup on the primary key in the table. A Covering Index contains all columns that will satisfy the SELECT, WHERE, ORDER BY and GROUP BY clauses in a composite index so after you traverse the secondary index, you already have all the data you need and do not need to go to the table storage. This will show up in the EXPLAIN as “Using index” and is a sign that you’ll perform only a single BTree traversal instead of one for the index and one for each of rows that index points to.

    Designing a covering index for our plays table requires careful thought to the ordering of the indexed columns. We need to order our indexed columns so that the composite index that composes the covering index can be reused for multiple queries. We also need to be thrifty what we’re storing here, because a covering index will make a partial copy of column data in the index.

    For the country counts, we could do something like this:

    create index covering_country_index on plays (track_id, created_at, country);

    And the above select would explain like this:

    Using where; Using index; Using temporary; Using filesort

    Almost – we are using columns from the index but because the index is ordered by track_id, created_at and the GROUP BY is on track_id, country, the GROUP BY will need to create the temporary table from the row data to perform the aggregation. If we reorder the columns in the index:

    create index covering_country_index on plays (track_id, country, created_at);

    The query explains a bit better:

    Using where; Using index

    Using where?! Where is this coming from? We have a covering index that has the first two columns used by the GROUP BY, all columns in the WHERE and all columns in the SELECT.

    Ah, always read the fine print:

    Even if you are using an index for all parts of a WHERE clause, you may see Using where if the column can be NULL.

    This table is old… like SoundCloud early days prototype to get it out the door old. It was created using the ActiveRecord schema language which didn’t encourage decent table schema design. ALL of the columns can be NULL… shit.

    So you want to ALTER TABLE eh?

    No worries, we can just convert the column type to be able to create our covering index. Being in production, the table should still be readable while it’s being migrated to the new “NOT NULL” columns. Rehearsing the necessary operations cost on production dataset took over 1 day before looking at the temporary table size to realize it was only 20% completed.

    The index creation would also lock the table due to a known bug to perform online table alterations on columns that are in the UTF8 charset. UTF8 column alterations will take out a read lock.

    Alter table on plays to change the charset, and add copies of all of our data into covering indexes causing up to a day of unavailability to achieve this wasn’t worth it. If we’re going to fix this, we may as well fully separate our write-optimized log from our read-optimized index.

    Aha moment

    What we now know:

    • Covering indexes can remove the filesort and surrogate key lookup from the clustered index.

    What we need:

    • A table and index format that does not cause random I/O reads.

    We were looking at duplicating the dataset of the logging table plays in the indexes. Only if we had tighter control of the indexes we could tune them to meet the requirements of sequential disk lookups and in-memory aggregation…

    Use the clustered index… (luke).

    InnoDB requires a primary key because this is the clustered index key. For almost all cases, a surrogate AUTO_INCREMENT integer key is the best because it will have good insertion performance and small overhead for the leaf nodes of the secondary indexes. But, you can specify an alternative key, even a composite key as long as it follows some constraints. Here’s the money quote, again from the excellent documentation:

    • If you define a PRIMARY KEY on your table, InnoDB uses it as the clustered index.
    • If you do not define a PRIMARY KEY for your table, MySQL picks the first UNIQUE index that has only NOT NULL columns as the primary key and InnoDB uses it as the clustered index.
    • If the table has no PRIMARY KEY or suitable UNIQUE index, InnoDB internally generates a hidden clustered index on a synthetic column containing row ID values. The rows are ordered by the ID that InnoDB assigns to the rows in such a table. The row ID is a 6-byte field that increases monotonically as new rows are inserted. Thus, the rows ordered by the row ID are physically in insertion order.

    Accessing a row through the clustered index is fast because the row data is on the same page where the index search leads. If a table is large, the clustered index architecture often saves a disk I/O operation when compared to storage organizations that store row data using a different page from the index record. (For example, MyISAM uses one file for data rows and another for index records.)

    The primary key must contain unique, non-null columns. This doesn’t work very well for the raw plays table because we could have 2 plays on the same track in the same second from the same country and would like to count both. Not to mention an increase in the key size would explode the size of our secondary indexes.

    A new table schema could take advantage of the clustered index that InnoDB provides. The trade-off is bloated secondary indexes due to the secondary index’s leaf nodes containing the primary key of the table. If we don’t need secondary indexes, then lets look at how to shape a clustered index.

    Introducing aggregate tables

    To support the time series and totals query for both per-user and per-track using unique primary keys that help shape our clustered index leveraged the “bucket” model from our prior attempts at the stats. We looked at the minimum granularity we wanted for the statistics and found that hourly and daily were sufficient for all aggregations. Given that, we could create a schema that looked something like this:

    create table plays_by_hour (
      owner_id integer unsigned not null,
      track_id integer unsigned not null,
      hour timestamp not null,
      count integer unsigned not null,
      primary key (owner_id, track_id, hour)

    Our time series per track query could be as simple as:

    select owner_id, track_id, hour, count
      from plays_by_hour
     where owner_id = 123
       and track_id = 123
       and hour between now() - interval 1 day and now()

    This explains as:

      select_type: SIMPLE
            table: plays_by_hour
             type: range
    possible_keys: PRIMARY
              key: PRIMARY
          key_len: 12
              ref: NULL
             rows: 1
            Extra: Using where

    The “Using where” in this explain is effectively the same thing as “Using index” because all columns in the where make up the branches in the clustered index and since we only need the count, that value stored on the leaves is already loaded from disk from the index traversal.

    The composition of coarser grained time series isn’t so bad now because we have loaded adjacent pages during the primary key choice.

    To get the total for that time series on day groupings, all that is needed is to sum the counts:

    select track_id, sum(count) as total
      from plays_by_hour
     where owner_id = 123
       and track_id = 123
       and hour between now() - interval 1 month and now()
     group by owner_id, track_id

    Back of the hand math is if we have 64 bytes per row, and even if we’re looking at 5 years of counts, our upper bound is no more than 3 MB to pull off of disk. When the tables are optimized, this is a sequential read at ~50MB/s, keeping well under 100ms which is acceptable for this application.

    By including the owner_id in the primary key, we get a degree of locality and a handy partitioning key for our queries as well.

    For the remaining metrics around countries and apps, we just added one more column to the clustered index.

    create table plays_by_country_day (
      track_id integer unsigned not null,
      day timestamp not null,
      country char(2) not null,
      count integer unsigned not null,
      primary key (track_id, day, country)

    (Don’t forget to pick the simplest character encoding to support your dataset like latin1 if you have any character columns).

    Reasoning about I/O

    In our MongoDB experience we spent a long time running experiments to identify the worst case performance. With MySQL and InnoDB, we have very clear documentation on the on-disk layout of the tables and indexes. We can reason very well about the worst case performance of our non-optimized plays table and index layout. Now we can reason very well about the worst case performance of this new table layout. The explain output is somewhat limited. We have one signal out of MySQL “Using where” and copious documentation to give that meaning.

    We chose this approach because we could reason about I/O. We also can reason about space. By using a partial aggregation on hour, we know that upper bound for storage is one row for every hour since the creation of the track. By treating the data as the (clustered) index, we can reason both about the index and data size, because they are the same! We can calculate storage requirements by correlating track growth to these aggregate tables by hour, not by popularity.

    What we also get is the desired sequential disk access on a cold cache because the row data (the count) is on the same page that gets traversed. We also get data locality of the time series for balanced BTrees. However, the persistence of the BTree can produce page splits where pages are not physically located next to each other, but due to the separation of these read-optimized tables from the source-of-truth, write-optimized event storage means we can run OPTIMIZE TABLE weekly which will reorganize the physical pages of the aggregate tables so that they are mostly sequential.

    The catch is, by using aggregate tables in read-optimized form, there is no way we can sustain the writes directly from our raw stream of events. We must also use an ETL process that batches the updates per hour and per day from the log table. We started running ours every 15 minutes, but found that we must tune that back to every hour. Not so realtime. To get the realtime behavior another in-memory system would be the best bet to lay over the persistence for the most recent hour/day of counts.


    This is not the end of our stats story. We want realtime stats for ourselves, and want you to have them too! This solution won’t last forever, but from start to finish, leveraging all the knowledge gained from an incomplete Cassandra implementation, and incomplete MongoDB implementation plus our in-house experience with MySQL, it took 3 months to research, design and deploy. With about 2 weeks purely on the data migration.

    This is not a story about how NoSQL fails, how MySQL wins, or how to code like a lolcat. It’s a story of what we’re doing to get by with the core physical limitations of CPU/Memory/Network/Disk and operational resources, one order of magnitude of user growth at a time.

    Speaking with others about this solution has also led to some insights. SSDs will likely make the data locality of using a clustered index a non-issue. And we could generalize the table layout for other metrics as well, instead of saying “plays_by_country_day” it would be “daily_stats_by_country” and include a 2 byte “metric_id” that would indicate this was a ‘play’ rather than ‘download’.

    Understanding the lower and upper bounds of your I/O profile is critical when designing for consistent performance, data-driven systems with any technology. Good documentation and understanding of your storage engine’s implementation is the key to achieve that.


    If you’re storing and serving hundreds of GB or TB of stats for your customers, how have you designed your system?