Monday, November 14, 2011

SpiderDuck: Twitter's Real-time URL Fetcher

Tweets often contain URLs or links to a variety of content on the web, including images, videos, news articles and blog posts. SpiderDuck is a service at Twitter that fetches all URLs shared in Tweets in real-time, parses the downloaded content to extract metadata of interest and makes that metadata available for other Twitter services to consume within seconds.

Several teams at Twitter need to access the linked content, typically in real-time, to improve Twitter products. For example:

  • Search to index resolved URLs and improve relevance
  • Clients to display certain types of media, such as photos, next to the Tweet
  • Tweet Button to count how many times each URL has been shared on Twitter
  • Trust & Safety to aid in detecting malware and spam
  • Analytics to surface a variety of aggregated statistics about links shared on Twitter

Background

Prior to SpiderDuck, Twitter had a service that resolved all URLs shared in Tweets by issuing HEAD requests and following redirects. While this service was simple and met the needs of the company at the time, it had a few limitations:

  • It resolved the URLs but did not actually download the content. The resolution information was stored in an in-memory cache but not persisted durably to disk. This meant that if the in-memory cache instance was restarted, data would be lost.
  • It did not implement politeness rules typical of modern bots, for example, rate limiting and following robots.txt directives.

Clearly, we needed to build a real URL fetcher that overcame the above limitations and would meet the company’s needs in the long term. Our first thought was to use or build on top of an existing open source URL crawler. We realized though that almost all of the available crawlers have two properties that we didn't need:

  • They are recursive crawlers. That is, they are designed to fetch pages and then recursively crawl the links extracted from those pages. Recursive crawling involves significant complexity in crawl scheduling and long term queuing, which isn’t relevant to our use case.
  • They are optimized for large batch crawls. What we needed was a fast, real-time URL fetcher.

Therefore, we decided to design a new system that could meet Twitter’s real-time needs and scale horizontally with its growth. Rather than reinvent the wheel, we built the new system largely on top of open source building blocks, thus still leveraging the contributions of the open source community.

This is typical of many engineering problems at Twitter – while they resemble problems at other large Internet companies, the requirement that everything work in real-time introduces unique and interesting challenges.

System Overview

Here’s an overview of how SpiderDuck works. The following diagram illustrates its main components.

The SpiderDuck architecture

Kestrel: This is message queuing system widely used at Twitter for queuing incoming Tweets.

Schedulers: These jobs determine whether to fetch a URL, schedule the fetch, follow redirect hops if any. After the fetch, they parse the downloaded content, extract metadata, and write the metadata to the Metadata Store and the raw content to the Content Store. Each scheduler performs its work independently of the others; that is, any number of schedulers can be added to horizontally scale the system as Tweet and URL volume grows.

Fetchers: These are Thrift servers that maintain short-term fetch queues of URLs, issue the actual HTTP fetch requests and implement rate limiting and robots.txt processing. Like the Schedulers, Fetchers scale horizontally with fetch rate.

Memcached: This is a distributed cache used by the fetchers to temporarily store robots.txt files.

Metadata Store: This is a Cassandra-based distributed hash table that stores page metadata and resolution information keyed by URL, as well as fetch status for every URL recently encountered by the system. This store serves clients across Twitter that need real-time access to URL metadata.

Content Store: This is an HDFS cluster for archiving downloaded content and all fetch information.

We will now describe the two main components of SpiderDuck -- the URL Scheduler and the URL Fetcher -- in more detail.

The URL Scheduler

The following diagram illustrates the various stages of processing in the SpiderDuck Scheduler.

The URL Scheduler

Like most of SpiderDuck, the Scheduler is built on top of an open source asynchronous RPC framework developed at Twitter called Finagle. (In fact, this was one of the earliest projects to utilize Finagle.) Each box in the diagram above, except for the Kestrel Reader, is a Finagle Filter – an abstraction that allows a sequence of processing stages to be easily composed into a fully asynchronous pipeline. Being fully asynchronous allows SpiderDuck to handle high throughput with a small, fixed number of threads.

The Kestrel Reader continuously polls for new Tweets. As Tweets come in, they are sent to the Tweet Processor, which extracts URLs from them. Each URL is then sent to the Crawl Decider stage. This stage reads the Fetch Status of the URL from the Metadata Store to check if and when SpiderDuck has seen the URL before. The Crawl Decider then decides whether the URL should be fetched based on a pre-defined fetch policy (that is, do not fetch if SpiderDuck has fetched it in the past X days). If the Decider determines to not fetch the URL, it logs the status to indicate that processing is complete. If it determines to fetch the URL, it sends the URL to the Fetcher Client stage.

The Fetcher Client stage uses a client library to talk to the Fetchers. The client library implements the logic that determines which Fetcher will fetch a given URL; it also handles the processing of redirect hops. (It is typical to have a chain of redirects because URLs posted on Twitter are often shortened.) A context object is associated with each URL flowing through the Scheduler. The Fetcher Client adds all fetch information including status, downloaded headers, and content into the context object and passes it on to the Post Processor. The Post Processor runs the extracted page content through a metadata extractor library, which detects page encoding and parses the page with an open-source HTML5 parser. The extractor library implements a set of heuristics to retrieve page metadata such as title, description, and representative image. The Post Processor then writes all the metadata and fetch information into the Metadata Store. If necessary, the Post Processor can also schedule a set of dependent fetches. An example of dependent fetches is embedded media, such as images.

After post-processing is complete, the URL context object is forwarded to the next stage that logs all the information, including full content, to the Content Store (HDFS) using an open source log aggregator called Scribe. This stage also notifies interested listeners that the URL processing is complete. The notification uses a simple Publish-Subscribe model, which is implemented using Kestrel’s fanout queues.

All processing steps are executed asynchronously – no thread ever waits for a step to complete. All state related to each URL in flight is stored in the context object associated with it, which makes the threading model very simple. The asynchronous implementation also benefits from the convenient abstractions and constructs provided by Finagle and the Twitter Util libraries.

The URL Fetcher

Let’s take a look at how a Fetcher processes a URL.

The URL Fetcher

The Fetcher receives the URL through its Thrift interface. After basic validation, the Thrift handler passes the URL to a Request Queue Manager, which assigns it to the appropriate Request Queue. A scheduled task drains each Request Queue at a fixed rate. Once the URL is pulled off of its queue, it is sent to the HTTP Service for processing. The HTTP service, built on top of Finagle, first checks if the host associated with the URL is already in its cache. If not, it creates a Finagle client for it and schedules a robots.txt fetch. After the robots.txt is downloaded, the HTTP service fetches the permitted URL. The robots.txt file itself is cached, both in the in-process Host Cache as well as in Memcached to prevent its re-fetch for every new URL that the Fetcher encounters from that host.

Tasks called Vultures periodically examine the Request Queues and Host Cache to find queues and hosts that haven’t been used for a period of time; when found, they are deleted. The Vultures also report useful stats through logs and the Twitter Commons stats exporting library.

The Fetcher’s Request Queue serves an important purpose: rate limiting. SpiderDuck rate limits outgoing HTTP fetch requests per-domain so as not to overload web servers receiving requests. For accurate rate limiting, SpiderDuck ensures each Request Queue is assigned to exactly one Fetcher at any point of time, with automatic failover to a different Fetcher in case the assigned Fetcher fails. A cluster suite called Pacemaker assigns Request Queues to Fetchers and manages failover. URLs are assigned to Request Queues based on their domains by a Fetcher client library. The default rate limit used for all web sites can be overriden on a per-domain basis, as needed. The Fetchers also implement queue backoff logic. That is, if URLs are coming in faster than they can be drained, they reject requests to indicate to the client to backoff or take other suitable action.

For security purposes, the Fetchers are deployed in a special zone in Twitter data centers called a DMZ. This means that the Fetchers cannot access Twitter’s production clusters and services. Hence, it is all the more important to keep them lightweight and self contained, a principle which guided many aspects of the design.

How Twitter uses SpiderDuck

Twitter services consume SpiderDuck data in a number of ways. Most query the Metadata Store directly to retrieve URL metadata (for example, page title) and resolution information (that is, the canonical URL after redirects). The Metadata Store is populated in real-time, typically seconds after the URL is tweeted. These services do not talk directly to Cassandra, but instead to SpiderDuck Thrift servers that proxy the requests. This intermediate layer provides SpiderDuck the flexibility to transparently switch storage systems, if necessary. It also supports an avenue for higher level API abstractions than what would be possible if the services interacted directly with Cassandra.

Other services periodically process SpiderDuck logs in HDFS to generate aggregate stats for Twitter’s internal metrics dashboards or conduct other types of batch analyses. The dashboards help us answer questions like “How many images are shared on Twitter each day?” “What news sites do Twitter users most often link to?” and “How many URLs did we fetch yesterday from this specific website?”

Note that services don’t typically tell SpiderDuck what to fetch; SpiderDuck fetches all URLs from incoming Tweets. Instead, services query information related to URLs after it becomes available. SpiderDuck also allows services to make requests directly to the Fetchers to fetch arbitrary content via HTTP (thus benefiting from our data center setup, rate limiting, robots.txt support and so on), but this use case is not common.

Performance numbers

SpiderDuck processes several hundred URLs every second. A majority of these are unique over the time window defined by SpiderDuck’s fetch policy, and hence get fetched. For URLs that get fetched, SpiderDuck’s median processing latency is under two seconds, and the 99th percentile processing latency is under five seconds. This latency is measured from Tweet creation time, which means that in under five seconds after a user clicked “Tweet,” the URL in that Tweet is extracted, prepared for fetch, all redirect hops are retrieved, the content is downloaded and parsed, and the metadata is extracted and made available to clients via the Metadata Store. Most of that time is spent either in the Fetcher Request Queues (due to rate limiting) or in actually fetching from the external web server. SpiderDuck itself adds no more than a few hundred milliseconds of processing overhead, most of which is spent in HTML parsing.

SpiderDuck’s Cassandra-based Metadata Store handles close to 10,000 requests per second. Each request is typically for a single URL or a small batch (around 20 URLs), but it also processes large batch requests (200-300 URLs). The store’s median latency for reads is 4-5 milliseconds, and its 99th percentile is 50-60 milliseconds.

Acknowledgements

The SpiderDuck core team consisted of the following folks: Abhi Khune, Michael Busch, Paul Burstein, Raghavendra Prabhu, Tian Wang and Yi Zhuang. In addition, we’d like to acknowledge the following folks, spanning many teams across the company, who contributed to the project either directly, by helping with components SpiderDuck relies on (for example, Cassandra, Finagle, Pacemaker and Scribe) or with its unique data center setup: Alan Liang, Brady Catherman, Chris Goffinet, Dmitriy Ryaboy, Gilad Mishne, John Corwin, John Sirois, Jonathan Boulle, Jonathan Reichhold, Marius Eriksen, Nick Kallen, Ryan King, Samuel Luckenbill, Steve Jiang, Stu Hood and Travis Crawford. Thanks also to the entire Twitter Search team for their invaluable design feedback and support. If you want to work on projects like this, join the flock!

Wednesday, September 14, 2011

Twitter’s mobile web app delivers performance

As the number of people using Twitter has grown, we've wanted to make sure that we deliver the best possible experience to users, regardless of platform or device. Since twitter.com is not optimized for smaller screens or touch interactions familiar to many smart phones, we decided to build a cross-platform web application that felt native in its responsiveness and speed for those who prefer accessing Twitter on their phone's or the tablet’s browser.

A better mobile user experience

When building mobile.twitter.com as a web client, we used many of the tools offered in HTML5, CSS3, and JavaScript to develop an application that has the same look, feel, and performance of a native mobile application. This post focuses on four primary areas of the mobile app architecture that enabled us to meet our performance and usability goals:

  • event listeners
  • scroll views
  • templates
  • storage
Twitter's mobile app architecture

Event listener

For the Twitter application to feel native, responses have to be immediate. The web application delivers this experience by using event listeners in its code.

Traditionally, Javascript uses DOM-only events such as onclick, mouseover, mouseout, focus, and blur to render a page. However, because Twitter has so many unique points of interaction, we decided to optimize the resources presented to us with mobile devices. The web application we developed uses event listeners throughout the code. These syntactic events, loaded with the JavaScript on the client, listen for unique triggers that are fired, following the users’ interactions. When users retweet or favorite Tweets, the JavaScript listens for those events and responds accordingly throughout the application, updating screen views where necessary.

The client-side JavaScript on the mobile application handles communication with Twitter through the Twitter API. To illustrate the use of event listeners, let’s look at how a Retweet works. When a user clicks the Retweet button on the UI, the system fires a click event that fires a Retweet request through the API.

The web client application listens for an event like a Retweet and updates the rest of the application when it receives it.

When that Retweet event is successful, a return event fires off a signal and the web app listens for a successful Retweet notification. When it receives the notification, the rest of the application updates appropriately.

The web app’s architecture ensures that while the user-facing layer for the various web apps may differ, the app reuses the custom event listeners throughout, thus making it possible to scale across all devices. For instance, both the iPhone and the iPad use the same views and modules, but in different navigation contexts, while the event architecture drives the rest of the application.

ScrollViews

Mobile browsers use a viewport on top of the browser's window to allow the user to zoom and scroll the content of an entire page. As helpful as this is, the viewport prevents the web pages from using fixed positioned elements and scrolling gestures. Both of these provide a better user experience because the app header is fixed and you can fit more content in a smaller area.

We worked around the limitations of native scrolling by writing a ScrollView component that allows users to scroll the content using JavaScript and CSS Transforms and Transitions. The CSS Transforms uses the device's GPU to mimic the browser's viewport.

ScrollView adds a scrolling element and a wrapper container to the element that you wish to scroll. The wrapper container has a fixed width and height so that the inner contents can overflow. The JavaScript calculates the amount of pixels that overflow and moves the scroll element, using CSS Transforms.

ScrollView listens for three events, onTouchStart, onTouchMove, and onTouchEnd to render a smooth animation:

onTouchStart

The mobile site stores the initial touch position, timestamp and other variables that it will use later to calculate the distance and velocity of the scroll.

onTouchMove

Next, the web app simply moves the scroll element by the delta between the start and the current positions.

onTouchEnd

Finally, the web app confirms if the scroll element has moved. If there was no movement, the application fires a click event that stops the scrolling action. If the scroll element moved, it calculates the distance and the speed to generate inertial scrolling, which fires a timer.

When the timer fires, the application uses CSS Transforms to move the scroll element to the new position while it decreases the velocity logarithmically. Once the velocity reaches a minimum speed, the application cancels the timer and completes the animation. During this process, it takes into account important coordinates to calculate the elasticity when the user scrolls past the lower or the upper boundary of the scroll element.

ScrollView is used to specify which content is scrollable. It can also be used to fix the navigation header to the top of the window to implement Pull-To-Refresh and infinite Tweet timelines.

Templates

One of the many customized solutions unique to Twitter and its user experience is a templating system. Templating is a two-pass process. During the first pass, the app expands the templates and marks the places in those resulting strings where dynamic data needs to go. The app then caches the results of the first pass. When it does a second pass to add dynamic data, the app references the cache, delivering a substantial performance benefit.

Efficient storage

In addition to custom events, we reexamined the use of storage available to the web app from the native browser. Since 15 percent of all mobile applications are launched when the device is offline, the solution needed to cover both online and offline instances. Twitter’s new mobile web app makes use of the HTML5’s app cache, which allows you to specify which files the browser should cache and make available to offline users. Using app cache also helps limit the amount of network activity. You can specify in your manifest file what to store; these include items such as the master index file, sprites, and other assets. When a user loads a page, the web app shows the assets from app cache; it stores the new assets when the manifest gets updated. This ensures the web app can be used even when it is offline, since an updated manifest is always waiting in the cache.

The web app also uses local storage for simple items, such as user settings, user information, and strings, that are persistent throughout the application for immediate access. It uses a SQL database to handle Tweets and Profiles. Within the schema, each storage database gets a name based on the user, allowing for very quick joins between tables. Separate user tables allow for encapsulation and provide the ability to bundle data securely, by user. Given the growing use cases for devices like an iPad, especially in a multilingual setting, this innovation allows for two people using separate languages to receive all the translated strings cached per user on the same device.

In addition to using storage elements of the HTML5 spec, Twitter’s mobile application also makes use of some of the best tools of CSS3. This list includes

  • Flex box model
  • Gradients
  • Shadows
  • 3D transforms
  • Transitions
  • Animations

Future direction

The event framework gives us a scalable way to grow this product over time. Our goal is to add support for new devices as well as build new user-facing features and elements. We will invest in both native applications and the web. In cases where we can or should go native, we will, but in many cases we believe our web app provides an optimal approach for serving a broad set of users.

Acknowledgements

Twitter’s HTML5 mobile application was developed by Manuel Deschamps (@manuel) and designed by Bryan Haggerty (@bhaggs). Mark Percival (@mdp) contributed to the coding of the mobile architecture.

Friday, August 19, 2011

Finagle: A Protocol-Agnostic RPC System

Finagle is a protocol-agnostic, asynchronous RPC system for the JVM that makes it easy to build robust clients and servers in Java, Scala, or any JVM-hosted language.

Rendering even the simplest web page on twitter.com requires the collaboration of dozens of network services speaking many different protocols. For example, in order to render the home page, the application issues requests to the Social Graph Service, Memcached, databases, and many other network services. Each of these speaks a different protocol: Thrift, Memcached, MySQL, and so on. Additionally, many of these services speak to other services -- they are both servers and clients. The Social Graph Service, for instance, provides a Thrift interface but consumes from a cluster of MySQL databases.

In such systems, a frequent cause of outages is poor interaction between components in the presence of failures; common failures include crashed hosts and extreme latency variance. These failures can cascade through the system by causing work queues to back up, TCP connections to churn, or memory and file descriptors to become exhausted. In the worst case, the user sees a Fail Whale.

Challenges of building a stable distributed system

Sophisticated network servers and clients have many moving parts: failure detectors, load-balancers, failover strategies, and so on. These parts need to work together in a delicate balance to be resilient to the varieties of failure that occur in a large production system.

This is made especially difficult by the many different implementations of failure detectors, load-balancers, and so on, per protocol. For example, the implementation of the back-pressure strategies for Thrift differ from those for HTTP. Ensuring that heterogeneous systems converge to a stable state during an incident is extremely challenging.

Our approach

We set out to develop a single implementation of the basic components of network servers and clients that could be used for all of our protocols. Finagle is a protocol-agnostic, asynchronous Remote Procedure Call (RPC) system for the Java Virtual Machine (JVM) that makes it easy to build robust clients and servers in Java, Scala, or any JVM-hosted language. Finagle supports a wide variety of request/response- oriented RPC protocols and many classes of streaming protocols.

Finagle provides a robust implementation of:

  • connection pools, with throttling to avoid TCP connection churn;
  • failure detectors, to identify slow or crashed hosts;
  • failover strategies, to direct traffic away from unhealthy hosts;
  • load-balancers, including “least-connections” and other strategies; and
  • back-pressure techniques, to defend servers against abusive clients and dogpiling.

Additionally, Finagle makes it easier to build and deploy a service that

  • publishes standard statistics, logs, and exception reports;
  • supports distributed tracing (a la Dapper) across protocols;
  • optionally uses ZooKeeper for cluster management; and
  • supports common sharding strategies.

We believe our work has paid off -- we can now write and deploy a network service with much greater ease and safety.

Finagle at Twitter

Today, Finagle is deployed in production at Twitter in several front- and back-end serving systems, including our URL crawler and HTTP Proxy. We plan to continue deploying Finagle more widely.

A Finagle-based architecture (under development)

The diagram illustrates a future architecture that uses Finagle pervasively. For example, the User Service is a Finagle server that uses a Finagle memcached client, and speaks to a Finagle Kestrel service.

How Finagle works

Finagle is flexible and easy to use because it is designed around a few simple, composable primitives: Futures, Services, and Filters.

Future objects

In Finagle, Future objects are the unifying abstraction for all asynchronous computation. A Future represents a computation that may not yet have completed and that can either succeed or fail. The two most basic ways to use a Future are to:

  • block and wait for the computation to return
  • register a callback to be invoked when the computation eventually succeeds or fails Future callbacks

    In cases where execution should continue asynchronously upon completion of a computation, you can specify a success and a failure callback. Callbacks are registered via the onSuccess and onFailure methods:

Composing Futures

Futures can be combined and transformed in interesting ways, leading to the kind of compositional behavior commonly seen in functional programming languages. For instance, you can convert a Future[String] to a Future[Int] by using map:

Similarly, you can use flatMap to easily pipeline a sequence of Futures:

In this example, User.authenticate() is performed asynchronously; Tweet.findAllByUser() is invoked on its eventual result. This is alternatively expressed in Scala, using the for statement:

Handling errors and exceptions is very easy when Futures are pipelined using flatMap or the for statement. In the above example, if User.authenticate() asynchronously raises an exception, the subsequent call to Tweet.findAllByUser() never happens. Instead, the result of the pipelined expression is still of the type Future[Seq[Tweet]], but it contains the exceptional value rather than tweets. You can respond to the exception using the onFailure callback or other compositional techniques.

A nice property of Futures, as compared to other asynchronous programming techniques (such as the continuation passing style), is that you an easily write clear and robust asynchronous code, even with more sophisticated operations such as scatter/gather:

Service objects

A Service is a function that receives a request and returns a Future object as a response. Note that both clients and servers are represented as Service objects.

To create a Server, you extend the abstract Service class and listen on a port. Here is a simple HTTP server listening on port 10000:

Building an HTTP client is even easier:

Filter objects

Filters are a useful way to isolate distinct phases of your application into a pipeline. For example, you may need to handle exceptions, authorization, and so forth before your Service responds to a request.

A Filter wraps a Service and, potentially, converts the input and output types of the Service to other types. In other words, a Filter is a Service transformer. Here is a filter that ensures an HTTP request has valid OAuth credentials that uses an asynchronous authenticator service:

A Filter then decorates a Service, as in this example:

Finagle is an open source project, available under the Apache License, Version 2.0. Source code and documentation are available on GitHub.

Acknowledgements

Finagle was originally conceived by Marius Eriksen and Nick Kallen. Other key contributors are Arya Asemanfar, David Helder, Evan Meagher, Gary McCue, Glen Sanford, Grant Monroe, Ian Ownbey, Jake Donham, James Waldrop, Jeremy Cloud, Johan Oskarsson, Justin Zhu, Raghavendra Prabhu, Robey Pointer, Ryan King, Sam Whitlock, Steve Jenson, Wanli Yang, Wilhelm Bierbaum, William Morgan, Abhi Khune, and Srini Rajagopal.

Thursday, August 4, 2011

A Storm is coming: more details and plans for release

We've received a lot of questions about what's going to happen to Storm now that BackType has been acquired by Twitter. I'm pleased to announce that I will be releasing Storm at Strange Loop on September 19th! Check out the session info for more details.

In my preview post about Storm, I discussed how Storm can be applied to a huge variety of realtime computation problems. In this post, I'll give more details on Storm and what it's like to use.

Here's a recap of the three broad use cases for Storm:

  1. Stream processing: Storm can be used to process a stream of new data and update databases in realtime. Unlike the standard approach of doing stream processing with a network of queues and workers, Storm is fault-tolerant and scalable.
  2. Continuous computation: Storm can do a continuous query and stream the results to clients in realtime. An example is streaming trending topics on Twitter into browsers. The browsers will have a realtime view on what the trending topics are as they happen.
  3. Distributed RPC: Storm can be used to parallelize an intense query on the fly. The idea is that your Storm topology is a distributed function that waits for invocation messages. When it receives an invocation, it computes the query and sends back the results. Examples of Distributed RPC are parallelizing search queries or doing set operations on large numbers of large sets.

The beauty of Storm is that it's able to solve such a wide variety of use cases with just a simple set of primitives.

Components of a Storm cluster

A Storm cluster is superficially similar to a Hadoop cluster. Whereas on Hadoop you run "MapReduce jobs", on Storm you run "topologies". "Jobs" and "topologies" themselves are very different -- one key difference is that a MapReduce job eventually finishes, whereas a topology processes messages forever (or until you kill it).

There are two kinds of nodes on a Storm cluster: the master node and the worker nodes. The master node runs a daemon called "Nimbus" that is similar to Hadoop's "JobTracker". Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.

Each worker node runs a daemon called the "Supervisor". The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.

All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster. Additionally, the Nimbus daemon and Supervisor daemons are fail-fast and stateless; all state is kept in Zookeeper or on local disk. This means you can kill -9 Nimbus or the Supervisors and they'll start back up like nothing happened. This design leads to Storm clusters being incredibly stable. We've had topologies running for months without requiring any maintenance.

Running a Storm topology

Running a topology is straightforward. First, you package all your code and dependencies into a single jar. Then, you run a command like the following:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

This runs the class backtype.storm.MyTopology with the arguments arg1 and arg2. The main function of the class defines the topology and submits it to Nimbus. The storm jar part takes care of connecting to Nimbus and uploading the jar.

Since topology definitions are just Thrift structs, and Nimbus is a Thrift service, you can create and submit topologies using any programming language. The above example is the easiest way to do it from a JVM-based language.

Streams and Topologies

Let's dig into the abstractions Storm exposes for doing scalable realtime computation. After I go over the main abstractions, I'll tie everything together with a concrete example of a Storm topology.

The core abstraction in Storm is the "stream". A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.

The basic primitives Storm provides for doing stream transformations are "spouts" and "bolts". Spouts and bolts have interfaces that you implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.

A bolt does single-step stream transformations. It creates new streams based on its input streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts.

Multi-step stream transformations are packaged into a "topology" which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

Everything in Storm runs in parallel in a distributed way. Spouts and bolts execute as many threads across the cluster, and they pass messages to each other in a distributed way. Messages never pass through any sort of central router, and there are no intermediate queues. A tuple is passed directly from the thread who created it to the threads that need to consume it.

Storm guarantees that every message flowing through a topology will be processed, even if a machine goes down and the messages it was processing get dropped. How Storm accomplishes this without any intermediate queuing is the key to how it works and what makes it so fast.

Let's look at a concrete example of spouts, bolts, and topologies to solidify the concepts.

A simple example topology

The example topology I'm going to show is "streaming word count". The topology contains a spout that emits sentences, and the final bolt emits the number of times each word has appeared across all sentences. Every time the count for a word is updated, a new count is emitted for it. The topology looks like this:

Here's how you define this topology in Java:

The spout for this topology reads sentences off of the "sentence_queue" on a Kestrel server located at kestrel.backtype.com on port 22133.

The spout is inserted into the topology with a unique id using the setSpout method. Every node in the topology must be given an id, and the id is used by other bolts to subscribe to that node's output streams. The KestrelSpout is given the id "1" in this topology.

setBolt is used to insert bolts in the topology. The first bolt defined in this topology is the SplitSentence bolt. This bolt transforms a stream of sentences into a stream of words. Let's take a look at the implementation of SplitSentence:

The key method is the execute method. As you can see, it splits the sentence into words and emits each word as a new tuple. Another important method is declareOutputFields, which declares the schema for the bolt's output tuples. Here it declares that it emits 1-tuples with a field called "word".

Bolts can be implemented in any language. Here is the same bolt implemented in Python:

The last parameter to setBolt is the amount of parallelism you want for the bolt. The SplitSentence bolt is given a parallelism of 10 which will result in 10 threads executing the bolt in parallel across the Storm cluster. To scale a topology, all you have to do is increase the parallelism for the bolts at the bottleneck of the topology.

The setBolt method returns an object that you use to declare the inputs for the bolt. Continuing with the example, the SplitSentence bolt subscribes to the output stream of component "1" using a shuffle grouping. "1" refers to the KestrelSpout that was already defined. I'll explain the shuffle grouping part in a moment. What matters so far is that the SplitSentence bolt will consume every tuple emitted by the KestrelSpout.

A bolt can subscribe to multiple input streams by chaining input declarations, like so:

You would use this functionality to implement a streaming join, for instance.

The final bolt in the streaming word count topology, WordCount, reads in the words emitted by SplitSentence and emits updated counts for each word. Here's the implementation of WordCount: WordCount maintains a map in memory from word to count. Whenever it sees a word, it updates the count for the word in its internal map and then emits the updated count as a new tuple. Finally, in declareOutputFields the bolt declares that it emits a stream of 2-tuples named "word" and "count".

The internal map kept in memory will be lost if the task dies. If it's important that the bolt's state persist even if a task dies, you can use an external database like Riak, Cassandra, or Memcached to store the state for the word counts. An in-memory HashMap is used here for simplicity purposes.

Finally, the WordCount bolt declares its input as coming from component 2, the SplitSentence bolt. It consumes that stream using a "fields grouping" on the "word" field.

"Fields grouping", like the "shuffle grouping" that I glossed over before, is a type of "stream grouping". "Stream groupings" are the final piece that ties topologies together.

Stream groupings

A stream grouping tells a topology how to send tuples between two components. Remember, spouts and bolts execute in parallel as many tasks across the cluster. If you look at how a topology is executing at the task level, it looks something like this:

When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?

A "stream grouping" answers this question by telling Storm how to send tuples between sets of tasks. There's a few different kinds of stream groupings.

The simplest kind of grouping is called a "shuffle grouping" which sends the tuple to a random task. A shuffle grouping is used in the streaming word count topology to send tuples from KestrelSpout to the SplitSentence bolt. It has the effect of evenly distributing the work of processing the tuples across all of SplitSentence bolt's tasks.

A more interesting kind of grouping is the "fields grouping". A fields grouping is used between the SplitSentence bolt and the WordCount bolt. It is critical for the functioning of the WordCount bolt that the same word always go to the same task. Otherwise, more than one task will see the same word, and they'll each emit incorrect values for the count since each has incomplete information. A fields grouping lets you group a stream by a subset of its fields. This causes equal values for that subset of fields to go to the same task. Since WordCount subscribes to SplitSentence's output stream using a fields grouping on the "word" field, the same word always goes to the same task and the bolt produces the correct output.

Fields groupings are the basis of implementing streaming joins and streaming aggregations as well as a plethora of other use cases. Underneath the hood, fields groupings are implemented using consistent hashing.

There are a few other kinds of groupings, but talking about those is beyond the scope of this post.

With that, you should now have everything you need to understand the streaming word count topology. The topology doesn't require that much code, and it's completely scalable and fault-tolerant. Whether you're processing 10 messages per second or 100K messages per second, this topology can scale up or down as necessary by just tweaking the amount of parallelism for each component.

The complexity that Storm hides

The abstractions that Storm provides are ultimately pretty simple. A topology is composed of spouts and bolts that you connect together with stream groupings to get data flowing. You specify how much parallelism you want for each component, package everything into a jar, submit the topology and code to Nimbus, and Storm keeps your topology running forever. Here's a glimpse at what Storm does underneath the hood to implement these abstractions in an extremely robust way.

  1. Guaranteed message processing: Storm guarantees that each tuple coming off a spout will be fully processed by the topology. To do this, Storm tracks the tree of messages that a tuple triggers. If a tuple fails to be fully processed, Storm will replay the tuple from the Spout. Storm incorporates some clever tricks to track the tree of messages in an efficient way.

  2. Robust process management: One of Storm's main tasks is managing processes around the cluster. When a new worker is assigned to a supervisor, that worker should be started as quickly as possible. When that worker is no longer assigned to that supervisor, it should be killed and cleaned up.

    An example of a system that does this poorly is Hadoop. When Hadoop launches a task, the burden for the task to exit is on the task itself. Unfortunately, tasks sometimes fail to exit and become orphan processes, sucking up memory and resources from other tasks.

    In Storm, the burden of killing a worker process is on the supervisor that launched it. Orphaned tasks simply cannot happen with Storm, no matter how much you stress the machine or how many errors there are. Accomplishing this is tricky because Storm needs to track not just the worker processes it launches, but also subprocesses launched by the workers (a subprocess is launched when a bolt is written in another language).

    The nimbus daemon and supervisor daemons are stateless and fail-fast. If they die, the running topologies aren't affected. The daemons just start back up like nothing happened. This is again in contrast to how Hadoop works.

  3. Fault detection and automatic reassignment: Tasks in a running topology heartbeat to Nimbus to indicate that they are running smoothly. Nimbus monitors heartbeats and will reassign tasks that have timed out. Additionally, all the tasks throughout the cluster that were sending messages to the failed tasks quickly reconnect to the new location of the tasks.

  4. Efficient message passing: No intermediate queuing is used for message passing between tasks. Instead, messages are passed directly between tasks using ZeroMQ. This is simpler and way more efficient than using intermediate queuing. ZeroMQ is a clever "super-socket" library that employs a number of tricks for maximizing the throughput of messages. For example, it will detect if the network is busy and automatically batch messages to the destination.

    Another important part of message passing between processes is serializing and deserializing messages in an efficient way. Again, Storm automates this for you. By default, you can use any primitive type, strings, or binary records within tuples. If you want to be able to use another type, you just need to implement a simple interface to tell Storm how to serialize it. Then, whenever Storm encounters that type, it will automatically use that serializer.

  5. Local mode and distributed mode: Storm has a "local mode" where it simulates a Storm cluster completely in-process. This lets you iterate on your topologies quickly and write unit tests for your topologies. You can run the same code in local mode as you run on the cluster.

Storm is easy to use, configure, and operate. It is accessible for everyone from the individual developer processing a few hundred messages per second to the large company processing hundreds of thousands of messages per second.

Relation to “Complex Event Processing”

Storm exists in the same space as “Complex Event Processing” systems like Esper, Streambase, and S4. Among these, the most closely comparable system is S4. The biggest difference between Storm and S4 is that Storm guarantees messages will be processed even in the face of failures whereas S4 will sometimes lose messages.

Some CEP systems have a built-in data storage layer. With Storm, you would use an external database like Cassandra or Riak alongside your topologies. It’s impossible for one data storage system to satisfy all applications since different applications have different data models and access patterns. Storm is a computation system and not a storage system. However, Storm does have some powerful facilities for achieving data locality even when using an external database.

Summary

I've only scratched the surface on Storm. The "stream" concept at the core of Storm can be taken so much further than what I've shown here -- I didn't talk about things like multi-streams, implicit streams, or direct groupings. I showed two of Storm's main abstractions, spouts and bolts, but I didn't talk about Storm's third, and possibly most powerful abstraction, the "state spout". I didn't show how you do distributed RPC over Storm, and I didn't discuss Storm's awesome automated deploy that lets you create a Storm cluster on EC2 with just the click of a button.

For all that, you're going to have to wait until September 19th. Until then, I will be working on adding documentation to Storm so that you can get up and running with it quickly once it's released. We're excited to release Storm, and I hope to see you there at Strange Loop when it happens.

- Nathan Marz (@nathanmarz)

Friday, July 1, 2011

Fast Core Animation UI for the Mac

Starting today, Twitter is offering TwUI as an open-source framework (https://github.com/twitter/twui) for developing interfaces on the Mac.

Until now, there was not a simple and effective way to design interactive, hardware-accelerated interfaces on the Mac. Core Animation can create hardware-accelerated drawings, but doesn't provide interaction mechanisms. AppKit and NSView have excellent interaction mechanisms, but the drawings operations are CPU-bound, which makes fluid scrolling, animations, and other effects difficult – if not impossible – to accomplish.

UIKit on Apple’s iOS platform has offered developers a fresh start. While UIKit borrows many ideas from AppKit regarding interaction, it can offload compositing to the GPU because it is built on top of Core Animation. This architecture has enabled developers to create many applications that were, until this time, impossible to build.

TwUI as a solution

TwUI brings the philosophy of UIKit to the desktop. It is built on top of Core Animation, and it borrows interaction ideas from AppKit. It allows for all the things Mac users expect, including drag & drop, mouse events, tooltips, Mac-like text selection, and so on. And, since TwUI isn’t bound by the constraints of an existing API, developers can experiment with new features like block-based drawRect and layout.

How TwUI works

You will recognize the fundamentals of TwUI if you are familiar with UIKit. For example, a "TUIView" is a simple, lightweight wrapper around a Core Animation layer – much like UIView on iOS.

TUIView offers useful subclasses for operations such as scroll views, table views, buttons, and so on. More importantly, TwUI makes it easy to build your own custom interface components. And because all of these views are backed by layers, composited by Core Animation, your UI is rendered at optimal speed.

Xcode running the TwUI example project

Ongoing development

Since TwUI forms the basis of Twitter for the Mac, it is an integral part of our shipping code. Going forward, we need to stress test it in several implementations. We’ll continue to develop additional features and make improvements. And, we encourage you to experiment, as that will help us build a robust and exciting UI framework for the Mac.

Acknowledgements

The following engineers were mainly responsible for the TwUI development:

-Loren Brichter (@lorenb), Ben Sandofsky (@sandofsky)