This is a story of how we adapted our architecture over time to accomodate growth.
Scaling is a luxury problem and surprisingly has more to do with organization than implementation. For each change we addressed the next order of magnitude of users we needed to support, starting in the thousands and now we’re designing for the hundreds of millions. We identify our bottlenecks and addressed them as simply as possible by introducing clear integration points in our infrastructure to divide and conquer each problem individually.
By identifying and extracting points of scale into smaller problems and having well defined integration points when the time arrived, we are able to grow organically.
From day one, we had the simple need of getting each idea out of our heads and in front of eyeballs as quickly as possible. During this phase, we used a very simple setup:
Apache was serving our image/style/behavior resources, and Rails backed by MySQL provided an environment where almost all of our product could be modeled, routed and rendered quickly. Most of our team understood this model and could work well together, delivering a product that is very similar to what we have today.
We consciously chose not to implement high availability at this point, knowing what it would take when that time hopefully arrived. At this point we left our private beta, revealing SoundCloud to the public.
Our primary cost optimization was for opportunity, and anything that got in the way of us developing the concepts behind SoundCloud were avoided. For example, when a new comment was posted, we blocked until all followers were notified knowing that we could make that asynchronous later.
In the early stages we were conscious to ensure we were not only building a product, but also a platform. Our Public API was developed alongside our website from the very beginning. We’re now driving the website with the same API we were offering to 3rd party integrations.
Apache served us well, but we were running Rails app servers on multiple hosts, and the routing and virtual host configuration in Apache was cumbersome to keep in sync between development and production.
The Web tier’s primary responsibility is to manage and dispatch incoming web requests, as well as buffering outbound responses so to free up an application server for the next request as quickly as possible. This meant the better connection pooling and content based routing configuration we had, the stronger this tier would be.
At this point we replaced Apache with Nginx and reduced our web tier’s configuration complexity, but our architecture didn’t change.
Nginx worked great, but as we were growing, we found that some workloads took significantly more time compared to others (in the order of hundreds of milliseconds).
When you’re working on a slow request when a fast request arrives, the fast request will have to wait until the slow request finishes, called “head of the line blocking problem”. When we had multiple applications servers each with its own listen socket backlog, analogous to a grocery store, where you inevitably stand at one register and watch all the other registers move faster than your own.
Around 2008 when we first developed the architecture, concurrent request processing in Rails and ActiveRecord was fairly immature. Even though we felt confident that we could audit and prepare our code for concurrent request processing, we did not want to invest the time to audit our dependencies. So we stuck with the model of a single concurrency per application server process and ran multiple processes per host.
In Kendall’s notation once we’ve sent a request from the web server to the application server, the request processing can be modeled by a M/M/1 queue. The response time of such a queue depends on all prior requests, so if we drastically increase the average work time of one request the average response time also drastically increases.
Of course, the right thing to do is to make sure our work times are consistently low for any web request, but we were still in the period of optimizing for opportunity, so we decided to continue with product development and solve this problem with better request dispatching.
We looked at the Phusion passenger approach of using multiple child processes per host but felt that we could easily fill each child with long-running requests. This is like having many queues with a few workers on each queue, simulating concurrent request processing on a single listen socket.
This changed the queue model from M/M/1 to M/M/c where c
is the number of child processes for every dispatched request. This is like the queue system found in a post office, or a “take a number, the next available worker will help you” kind of queue. This model reduces the response time by a factor of c
for any job that was waiting in the queue which is better, but assuming we had 5 children, we would just be able to accept an average of 5 times as many slow requests. We were already seeing a factor of 10 growth in the upcoming months, and had limited capacity per host, so adding only 5 to 10 workers was not enough address the head of the line blocking problem.
We wanted a system that never queued, but if it did queue, the wait time in the queue was minimal. Taking the M/M/c model to the extreme, we asked ourselves “how can we make c
as large as possible?”
To do this, we needed to make sure that a single Rails application server never received more than one request at a time. This ruled out TCP load balancing because TCP has no notion of an HTTP request/response. We also needed to make sure that if all application servers were busy, the request would be queued for the next available application server. This meant we must maintain complete statelessness between our servers. We had the latter, but didn’t have former.
We added HAProxy into our infrastructure, configuring each backend with a maximum connection count of 1 and added our backend processes across all hosts, to get that wonderful M/M/c reduction in resident wait time by queuing the HTTP request until any backend process on any host becomes available. HAProxy entered as our queuing load balancer that would buffer any temporary back-pressure by queuing requests from the application or dependent backend services so we could defer designing sophisticated queuing in other components in our request pipeline.
I heartily recommend Neil J. Gunther’s work Analyzing Computer System Performance with Perl::PDQ to brush up on queue theory and strengthen your intuition on how to model and measure queuing systems from HTTP requests all the way down to your disk controllers.
One class of request that took a long time was the fan-out of notifications from social activity. For example, when you upload a sound to SoundCloud, everyone that follows you will be notified. For people with many followers, if we were to do this synchronously, the request times would exceed the tens of seconds. We needed to queue a job that would be handled later.
Around the same time we were considering how to manage our storage growth for sounds and images, and had chosen to offload storage to Amazon S3 keeping transcoding compute in Amazon EC2.
Coordinating these subsystems, we needed some middleware that would reliably queue, acknowledge and re-deliver job tickets on failure. We went through a few systems, but in the end settled on AMQP because of having a programmable topology, implemented by RabbitMQ.
To keep the same domain logic that we had in the website, we loaded up the Rails environment and built a lightweight dispatcher class with one queue per concern. The queues had a namespace that describes estimated work times. This created a priority system in our asynchronous workers without requiring adding the complexity of message priorities to the broker by starting one dispatcher process for each class of work that bound to multiple queues in that work class. Most of our queues for asynchronous work performed by the application are namespaced with either “interactive” (under 250ms work time) or “batch” (any work time). Other namespaces were used specific to each application.
When we approached the hundreds of thousands user mark, we saw we were burning too much CPU in the application tier, mostly spent in the rendering engine and Ruby runtime.
Instead of introducing Memcached to alleviate IO contention in the database like most applications, we aggressively cached partial DOM fragments and full pages. This turned into an invalidation problem which we solved by maintaining the reverse index of cache keys that also needed invalidation on model changes in memcached.
Our highest volume request was one specific endpoint that was delivering data for the widget. We created a special route for that endpoint in nginx and added proxy caching to that stack, but wanted to generalize caching to the point where any end point could produce proper HTTP/1.1 cache control headers and would be treated well by an intermediary we control. Now our widget content is served entirely from our public API.
We added Memcached and much later Varnish to our stack to handle backend partially rendered template caching and mostly read-only API responses.
Our worker pools grew, handling more asynchronous tasks. The programming model was similar for all of them: take a domain model and schedule a continuation with that model state to be processed at a later state.
Generalizing this pattern, we leveraged the after-save hooks in ActiveRecord models in a way we call ModelBroadcast. The principle is that when the business domain changes, events are dropped on the AMQP bus with that change for any asynchronous client that is interested in that class of change. This technique of decoupling the write path from the readers enables the next evolution of growth by accommodating integrations we hadn’t foreseen.
after_create do |r|
broker.publish("models", "create.#{r.class.name}", r.attributes.to_json)
end
after_save do |r|
broker.publish("models", "save.#{r.class.name}", r.changes.to_json)
end
after_destroy do |r|
broker.publish("models", "destroy.#{r.class.name}", r.attributes.to_json)
end
This isn’t perfect, but it added a much needed non-disruptive, generalized, out-of-app integration point in the course of a day.
Our most rapid data growth was the result of our Dashboard. The Dashboard is a personalized materialized index of activities inside of your social graph and the primary place to personalize your incoming sounds from the people you follow.
We have always had a storage and access problem with this component. Looking at the read and write paths separately, the read path needs to be optimized for sequential access per user over a time range. The write path needs to be optimized for random access where one event may affect millions of users’ indexes.
The solution required a system that could reorder writes from random to sequential and store in sequential format for read that could be grown to multiple hosts. Sorted string tables are a perfect fit for the persistence format, and add the promise of free partitioning and scaling in the mix, we chose Cassandra as the storage system for the Dashboard index.
The intermediary steps started with the model broadcast and used RabbitMQ as a queue for staged processing, in three major steps: fan-out, personalization, and serialization of foreign key references to our domain models.
Our search is conceptually a back-end service that exposes a subset of data store operations over an HTTP interface for queries. Updating of the index is handled similarly to the dashboard via ModelBroadcast with some enhancement from database replicas with index storage managed by Elastic Search.
To make sure users are properly notified when their dashboard updates, whether this is over iOS/Android push notifications, email or other social networks we simply added another stage in the Dashboard workflow that receives messages when a dashboard index is updated. Agents can get that completion event routed to their own AMQP queues via the message bus to initiate their own logic. Reliable messages at the completion of persistence is part of the eventual consistency we work with throughout our system.
Our statistics offered to logged in users at https://soundcloud.com/you/stats also integrates via the broker, but instead of using ModelBroadcast, we emit special domain events that are queued up in a log then rolled up into a separate database cluster for fast access across the various time ranges.
We have established some clear integration points in the broker for asynchronous write paths and in the application for synchronous read and write paths to backend services.
Over time, the application server’s codebase has collected both integration and functional responsibilities. As the product development settles, we have much more confidence now to decouple the function from the integration to be moved into backend services that can be consumed à la carte by not only the application but by other backend services, each with a private namespace in the persistence layer.
The way we develop SoundCloud is to identify the points of scale then isolate and optimize the read and write paths individually, in anticipation of the next magnitude of growth.
At the beginning of the product, our read and write scaling limitations were consumer eyeballs and developer hours. Today, we’re engineering for the realities of limited IO, network and CPU. We have the integration points set up in our architecture, all ready for the continued evolution of SoundCloud!