Announcing Pulsar: Real-time Analytics at Scale

We are happy to announce Pulsar – an open-source, real-time analytics platform and stream processing framework. Pulsar can be used to collect and process user and business events in real time, providing key insights and enabling systems to react to user activities within seconds. In addition to real-time sessionization and multi-dimensional metrics aggregation over time windows, Pulsar uses a SQL-like event processing language to offer custom stream creation through data enrichment, mutation, and filtering. Pulsar scales to a million events per second with high availability. It can be easily integrated with metrics stores like Cassandra and Druid.

pulsar_logo

Why Pulsar

eBay provides a platform that enables millions of buyers and sellers to conduct commerce transactions. To help optimize eBay end users’ experience, we perform analysis of user interactions and behaviors. Over the past years, batch-oriented data platforms like Hadoop have been used successfully for user behavior analytics. More recently, we have newer use cases that demand collection and processing of vast numbers of events in near real time (within seconds), in order to derive actionable insights and generate signals for immediate action. Here are examples of such use cases:

  • Real-time reporting and dashboards
  • Business activity monitoring
  • Personalization
  • Marketing and advertising
  • Fraud and bot detection

We identified a set of systemic qualities that are important to support these large-scale, real-time analytics use cases:

  • Scalability - Scaling to millions of events per second
  • Latency - Sub-second event processing and delivery
  • Availability - No cluster downtime during software upgrade, stream processing rule updates , and topology changes
  • Flexibility - Ease in defining and changing processing logic, event routing, and pipeline topology
  • Productivity - Support for complex event processing (CEP) and a 4GL language for data filtering, mutation, aggregation, and stateful processing
  • Data accuracy - 99.9% data delivery
  • Cloud deployability – Node distribution across data centers using standard cloud infrastructure

Given our unique set of requirements, we decided to develop our own distributed CEP framework. Pulsar CEP provides a Java-based framework as well as tooling to build, deploy, and manage CEP applications in a cloud environment. Pulsar CEP includes the following capabilities:

  • Declarative definition of processing logic in SQL
  • Hot deployment of SQL without restarting applications
  • Annotation plugin framework to extend SQL functionality
  • Pipeline flow routing using SQL
  • Dynamic creation of stream affinity using SQL
  • Declarative pipeline stitching using Spring IOC, thereby enabling dynamic topology changes at runtime
  • Clustering with elastic scaling
  • Cloud deployment
  • Publish-subscribe messaging with both push and pull models
  • Additional CEP capabilities through Esper integration

On top of this CEP framework, we implemented a real-time analytics data pipeline.

Pulsar real-time analytics pipeline

Pulsar's real-time analytics data pipeline consists of loosely coupled stages. Each stage is functionally separate from its neighboring stage. Events are transported asynchronously across a pipeline of these loosely coupled stages. This model provides higher reliability and scalability. Each stage can be built and operated independently from its neighboring stages, and can adopt its own deployment and release cycles. The topology can be changed without restarting the cluster.

pulsar_pipeline

Here is some of the processing we perform in our real-time analytics pipeline:

  • Enrichment - Decorate events with additional attributes. For example, we can add geo location information to user interaction events based on the IP address range.
  • Filtering and mutation - Filter out irrelevant attributes and events, or transform the content of an event.
  • Aggregation - Count the number of events, or add up metrics along a set of dimensions over a time window.
  • Stateful processing - Group multiple events into one, or generate a new event based on a sequence of events and processing rules. An example is our sessionization stage, which tracks user session-based metrics by grouping a sequence of user interaction events into web sessions.

The Pulsar pipeline can be integrated with different systems. For example, summarized events can be sent to a persistent metrics store to support ad-hoc queries. Events can also be sent to some form of visualization dashboard for real-time reporting, or to backend systems that can react to event signals.

A taste of complex event processing

In Pulsar, our approach is to treat the event stream like a database table. We apply SQL queries and annotations on live streams to extract summary data as events are moving.

The following are a few examples of how common processing can be expressed in Pulsar.

Event filtering and routing

insert into SUBSTREAM select D1, D2, D3, D4
from RAWSTREAM where D1 = 2045573 or D2 = 2047936 or D3 = 2051457 or D4 = 2053742; // filtering
@PublishOn(topics=“TOPIC1”)   // publish sub stream at TOPIC1
@OutputTo(“OutboundMessageChannel”)
@ClusterAffinityTag(column = D1);    // partition key based on column D1
select * FROM SUBSTREAM;

Aggregate computation

// create 10-second time window context
create context MCContext start @now end pattern [timer:interval(10)];
// aggregate event count along dimension D1 and D2 within specified time window
context MCContext insert into AGGREGATE select count(*) as METRIC1, D1, D2 FROM RAWSTREAM group by D1,D2 output snapshot when terminated;
select * from AGGREGATE;

TopN computation

// create 60-second time window context
create context MCContext start @now end pattern [timer:interval(60)];
// sort to find top 10 event counts along dimensions D1, D2, and D3
// within specified time window
context MCContext insert into TOPITEMS select count(*) as totalCount, D1, D2, D3 from RawEventStream group by D1, D2, D3 order by count(*) limit 10;
select * from TOPITEMS;

Pulsar deployment architecture

pulsar_deployment

Pulsar CEP processing logic is deployed on many nodes (CEP cells) across data centers. Each CEP cell is configured with an inbound channel, outbound channel, and processing logic. Events are typically partitioned based on a key such as user id. All events with the same partitioned key are routed to the same CEP cell. In each stage, events can be partitioned based on a different key, enabling aggregation across multiple dimensions. To scale to more events, we just need to add more CEP cells into the pipeline. Using Apache ZooKeeper, Pulsar CEP automatically detects the new cell and rebalances the event traffic. Similarly, if a CEP cell goes down, Pulsar CEP will reroute traffic to other nodes.

Pulsar CEP supports multiple messaging models to move events between stages. For low delivery latency, we recommend the push model when events are sent from a producer to a consumer with at-most-once delivery semantics. If a consumer goes down or cannot keep up with the event traffic, it can signal the producer to temporarily push the event into a persistent queue like Kafka; subsequently, the events can be replayed. Pulsar CEP can also be configured to support the pull model with at-least-once delivery semantics. In this case, all events will be written into Kafka, and a consumer will pull from Kafka.

What’s next

Pulsar has been deployed in production at eBay and is processing all user behavior events. We have open-sourced the Pulsar code, we plan to continue to develop the code in the open, and we welcome everyone’s contributions. Below are some features we are working on. We would love to get your help and suggestions.

  • Real-time reporting API and dashboard
  • Integration with Druid or other metrics stores
  • Persistent session store integration
  • Support for long rolling-window aggregation

Please visit http://gopulsar.io for source code, documentation, and more information.

A Case Study in Empirical Bayes

Empirical Bayes is a statistical technique that is both powerful and easy to use. My goal is to illustrate that statement via a case study using eBay data. Quoting the famous statistician Brad Efron,

Empirical Bayes seems like the wave of the future to me, but it seemed that way 25 years ago and the wave still hasn’t washed in, despite the fact that it is an area of enormous potential importance.

Hopefully this post will be one small step in helping Empirical Bayes to wash in! The case study I'll present comes from ranking the items that result from a search query. One feature that is useful for ranking items is their historical popularity. On eBay, some items are available in multiple quantities. For these, popularity can be measured by the number of times an item is sold divided by the number of times it is displayed, which I will call sales/impressions (S/I). By the way, everything I say applies to any ratio of counts, not just sales and impressions.

The problem

The problem I want to discuss is what to do if the denominator is small. Suppose that items typically have 1 sale per 100 impressions. Now suppose that a particular item gets a sale just after being listed. This is a typical item that has a long-term S/I of about 0.01, but by chance it got its sale early, say after the 3rd impression. So S/I is 1/3, which is huge. It looks like an enormously popular item, until you realize that the denominator I is small: it has received only 3 impressions. One solution is to pass the problem downstream, and give the ranker both S/I and I. Let the ranker figure out how much to discount S/I when I is small. Passing the buck might make sense in some situations, but I will show that it's not necessary, and that it's possible to pass a meaningful value even when I is small.

How to do that? Informally, I want a default value of S/I, and I want to gradually move from that default to the actual S/I as I increases. Your first reaction is probably to do this by picking a number (say 100), and if I < 100 use the default, otherwise S/I. But once you start to wonder whether 100 is the right number, you might as well go all the way and do things in a principled way using probabilities.

The solution

Jumping to the bottom line: the formula will be (S + α)/(I + γ). This clearly satisfies the desire to be near S/I when S and I are large. It also implies that the default value is α/γ, since that's what you get when S=I=0. In the rest of this post I will explain two things. First, how to pick α and γ (there is a right way and a wrong way). And second, where the shape of the formula (S + α)/(I +γ) comes from. If you're familiar with Laplace smoothing then you might think of using (S+1)/(I+1), and our formula is a generalization of that. But it still begs the question — why a formula of this form, rather than, for example, a weighted sum (1 - e^{-\alpha I})(S/I) + e^{-\alpha I}(\alpha/\gamma).

The formula (S + α)/(I +γ) comes by imagining that at each impression, there is a probability of an associated sale, and then returning the best estimate of that probability instead of returning S/I. I'll start with the simplest way of implementing this idea (although it is too simple to work well).

Suppose the probability of a sale has a fixed universal value p, so that whenever a user is shown an item, there is a probability p that the item is sold. This is a hypothetical model of how users behave, and it's straightforward to test if it fits the data. Simply pick a set of items, each with an observed sale count and impression count. If the simple model is correct, then an item with n impressions will receive k sales according to the binomial formula:

\Pr(\mbox{getting } k \mbox{ sales}) = \binom{n }{ k} p^{k}(1-p)^{n - k}


Here n is the number of impressions and k the number of sales. As mentioned earlier, this whole discussion also works for other meanings of k and n, such as k is clicks and n is impressions. To test the simple model, I can compare two sets of data. The first is the observed pairs (n,k). In other words, I retrieve historical info for each item, and record n impressions and k sales. I construct the second set by following the simple model: I take the actual number of impressions n, and randomly generate the number of sales k according to the formula above. Below is a histogram of the two data sets. Red is simulated (the model), and blue is actual. The match is terrible.

Bayes plot1

Here is some more detail on the plot: Only items with a nonzero sale count are shown. In the simulation there are 21% items with S=0, but the actual data has 47%.

So we need to go to a more sophisticated model. Instead of a fixed value of p, imagine drawing p from a probability distribution and plugging it into the inset equation, which is then used to get the random k. As you can see in the plot below, the two histograms have a much more similar shape than the previous plot, and so this model does a better job of matching the actual data.

Bayes plot2

Now it all boils down to finding the distribution for p. Since 0 \leq p \leq 1, that means finding a probability distribution on the interval [0, 1]. The most common such distribution is the Beta distribution, which has two parameters, \alpha and \beta. By assuming a Beta distribution, I reduce the problem to finding \alpha and \beta (and yes, this α is the same one as in the formula (S + α)/(I +γ)). This I will do by finding the values of \alpha and \beta that best explain the observed values of k and n. Being more precise, associated to each of N historical items is a sale count k_i and an impression count n_i, with 1 \leq i \leq N.

I was perhaps a bit flip in suggesting the Beta distribution because it is commonly used. The real reason for selecting Beta is that it makes the computations presented in the Details section below much simpler. In the language of Bayesian statistics, the Beta distribution is conjugate to the binomial.

At this point you can fall into a very tempting trap. Each k_i/n_i is a number between 0 and 1, so all the values form a histogram on [0,1]. The possible values of p follow the density function for the Beta distribution and so also form a histogram on [0,1]. Thus you might think you could simply pick the values of \alpha and \beta that make the two histograms match as closely as possible. This is wrong, wrong, wrong. The values k_i/n_i are from a discrete distribution and often take on the value 0. The values of p come from a continuous distribution (Beta) and are never 0, or more precisely, the probability that p=0 is 0. The distributions of k/n and of p are incompatible.

In my model, I'm given n and I spit out k by drawing p from a Beta distribution. The Beta is invisible (latent) and indirectly defines the model. I'll give a name to the output of the model: X. Restating, fix an n and make X a random variable that produces value k with the probability controlled indirectly by the Beta distribution. I need to match the observed (empirical) values of (n_i, k_i) to X, not to Beta. This is the empirical Bayes part. I'll give an algorithm that computes \alpha and \beta later.

But first let me close the loop, and explain how all this relates to (S + α)/(I + γ). Instead of reporting S/I, I will report the probability of a sale. Think of the probability as a random variable — call it P. I will report the mean value of the random variable P. How to compute that? I heard a story about a math department that was at the top of a tall building whose windows faced the concrete wall of an adjacent structure. Someone had spray-painted on the wall "don't jump, integrate by parts." If it had been a statistics department, it might have said "don't jump, use Baye's rule."

Baye's rule implies a conditional probability. I want not the expected value of P, but the expected value of P conditional on n impressions and k sales. I can compute that from the conditional distribution \Pr(P = p \mid (n,k)). To compute this, flip the two sides of the | to get \Pr((n,k) \mid P=p). This is \Pr(\mbox{getting } k \mbox{ sales}), which is just the inset equation at the beginning of this post!

Now you probably know that in Baye's rule you can't just flip the two sides, you also have to include the prior. The formula is really \Pr(P = p \mid (n,k)) = \mbox{constant} \times \Pr((n,k) \mid P = p) \Pr(P=p). And \Pr (P=p) is what we decided to model using the Beta distribution with parameters \alpha and \beta. These are all the ingredients for Empirical Bayes. I need \Pr(P = p \mid (n,k)), I evaluate it using Baye's rule, the rule requires a prior, and I use empirical data to pick the prior. In empirical Bayes, I select the prior that best explains the empirical data. For us, the empirical data is the observed values of (n_i, k_i). When you do the calculations (below) using the Beta(\alpha, \beta) distribution as the prior, you get that the mean of P is (S + α)/(I + γ) where γ = α + β.

How does this compare with the simplistic method of using S/I when I > δ, and η otherwise? The simplistic formula involves two constants δ and η just as the principled formula involves two constants α and γ. But the principled method comes with an algorithm for computing α and γ given below. The algorithm is a few lines of R code (using the optimx package).

The details

I'll close by filling in the details. First I'll explain how to compute \alpha and \beta.

I have empirical data on N items. Associated with the i-th item (1 \leq i \leq N) is a pair (k_i, n_i), where k_i might be the number of sales and n_i the number of impressions, but the same reasoning works for clicks instead of sales. A model for generating the (k_i, n_i) is that for each impression there is a probability p that the impression results in a sale. So given n_i, the probability that k_i = j is \binom{n_i }{ j} p^{j}(1-p)^{n_i - j}. Then I add in that the probability p is itself random, drawn from a parametrized prior distribution with density function f_\theta(p). I generate the (k_i, n_i) in a series of independent steps. At step i, I draw p_i from f_\theta(p), and then generate k_i according to the binomial probability distribution on k_i:

\mbox{Prob}(k_i = j) = \binom{n_i }{ j} p_i^{j}(1-p_i)^{n_i - j}


Using this model, the probability of seeing (k_i, n_i) given n_i is computed by averaging over the different possible values of p, giving

q_i(\theta) = \int_0^1 \binom{n_i }{ k_i} p^{k_i}(1-p)^{n_i - k_i} f_\theta(p) dp


I'd like to find the parameter \theta that best explains the observed (k_i, n_i) and I can do that by maximizing the probability of seeing all those (n_i, k_i). The probability seeing (n_i, k_i) is q_i(\theta), the probability of seeing the whole set is \prod_i q_i(\theta) and the log of that probability is \sum_i \log q_i(\theta). This is a function of \theta, and I want to find the value of \theta that maximizes it. This log probability is conventionally called the log-likelihood.

Since I'm assuming f_\theta(p) is a beta distribution, with \theta = (\alpha, \beta), then q_i(\theta) becomes

\begin{eqnarray*}
q_i(\alpha, \beta) & = & \dbinom{n_i }{ k_i} \int_0^1 p^{k_i}(1-p)^{n_i - k_i}
\frac{ \Gamma(\alpha + \beta)}{\Gamma(\alpha)\Gamma(\beta)} p^{\alpha-1}(1-p)^{\beta-1} dp \\
& = &
\dbinom{n_i }{ k_i} \frac{ \Gamma(\alpha + \beta)}{\Gamma(\alpha)\Gamma(\beta)}
\int_0^1 p^{k_i + \alpha -1}(1-p)^{n_i +\beta - k_i - 1} dp \\
& = &
\dbinom{n_i }{ k_i} \frac{B(\alpha + k_i, n_i + \beta - k_i)}{B(\alpha, \beta)}
\end{eqnarray*}
The calculation above uses the definition of the beta function B and the formula for the beta integral
\begin{eqnarray*}
B(\alpha,\beta) & = & \frac{\Gamma(\alpha)\Gamma(\beta)}{\Gamma(\alpha+\beta)} \\
\int_0^1 x^{\alpha-1} (1-x)^{\beta-1} dx & = & B(\alpha, \beta)
\end{eqnarray*}

If you don't want to check my calculations, q_i(\alpha, \beta) is just the beta-binomial distribution, and you can find its formula in many books and web pages.

Restating, to find \alpha, \beta is to maximize the log-likelihood l(\alpha, \beta) = \sum_i \log q_i(\alpha, \beta), specifically

l(\alpha, \beta) = \sum_i \left(\log \dbinom{n_i }{ k_i} + \log B(\alpha + k_i, n_i + \beta - k_i) - \log B(\alpha, \beta)\right)


And since the first term doesn't involve \alpha or \beta, you only need to maximize

\sum_{i=1}^N\log B(\alpha + k_i, n_i + \beta - k_i) - N\log B(\alpha, \beta)


The method I used to maximize that expression was the optimx package in R.

The final missing piece is why, when I replace S/I with the probability that an impression leads to a sale, the formula is (k + \alpha)/(n + \gamma).

I have an item with an unknown probability of sale p. All that I do know is that it got k sales out of n impressions. If P is the random variable representing the sale probability of an item, and F = (k,n) is a random variable representing the sale/impression of an item, I want \Pr(P = p \mid F = (k, n)), which I write as \Pr(p \mid k, n) for short. Evaluate this using Baye's rule,

\Pr(p \mid k, n) = \Pr(k,n \mid p) \Pr(p) / \Pr(k,n)


The \Pr(k,n) term can be ignored. This is not deep, but can be confusing. In fact, any factor \phi(k,n) involving only k and n (like 1/\Pr(k,n)) can be ignored. That's because \int \Pr(p \mid k, n) dp = 1, so if \Pr(p \mid k, n) =f(p,k,n)\phi(k,n) it follows that \phi can be recovered from f(p,k,n) using \phi(k,n) = 1/\int(f(p,k,n)dp. In other words, I can simply ignore a \phi and reconstruct it at the very end by making sure that \int \Pr(p \mid k, n) dp = 1.

I know that

\Pr(k,n \mid p) = \binom{n }{ k} p^k(1-p)^{n-k}


For us, the prior \Pr(p) = f_\theta(p) = f_{\alpha, \beta}(p) is a beta distribution, \mbox{Beta}_{\alpha, \beta}(p) = p^{\alpha-1}(1~-~p)^{\beta-1}/B(\alpha, \beta). Some algebra then gives

\Pr(p \mid k, n) \propto \Pr(k,n \mid p) \Pr(p)\propto \mbox{Beta}_{\alpha + k, \beta + n - k}(p)


The \propto symbol ignores constants involving only k and n. Since the rightmost term integrates to 1, the proportionality is an equality:

\Pr(p \mid k, n) = \mbox{Beta}_{\alpha + k, \beta + n - k}(p)


For an item with (k,n) I want to know the value of p, but this formula gives the probability density for p. To get a single value I take the mean, using the fact that the mean of \mbox{Beta}_{\alpha, \beta} is \alpha/(\alpha+\beta). So the estimate for p is

\mbox{Mean}(\mbox{Beta}_{\alpha + k, \beta + n - k}) = \frac{\alpha + k}{\alpha + \beta + n}

This is just (S + α)/(I + γ) with γ = α + β.

There's room for significant improvement. For each item on eBay, you have extra information like the price. The price has a big effect on S/I, and so you might account for that by dividing items into a small number of groups (perhaps low-price, medium-price and high-price), and computing \alpha, \beta for each. There's a better way, which I will discuss in a future post.

HDFS Storage Efficiency Using Tiered Storage

At eBay, we run Hadoop clusters comprised of thousands of nodes that are shared by thousands of users. We store hundreds of petabytes of data in our Hadoop clusters. In this post, we look at how to optimize big data storage based on frequency of data usage. This method helps reduce the cost in an effective manner.

Hadoop and its promise

It is now common knowledge that commodity hardware can be grouped together to create a Hadoop cluster with big data storage and computing capability. Parts of the data are stored in each individual machine, and data processing logic is also run on the same machines.

hadoop_storage

For example: A 1,000-node Hadoop cluster with storage capacity of 20 TB per node can store up to 20 petabytes (PB) of data. All these machines have sufficient computing power to fulfill Hadoop’s motto of “take compute to data.”

Temperature of data

Different types of datasets are usually stored in the clusters, which are shared by different teams running different types of workloads to crunch through the data. Each dataset is enhanced and enriched by daily and hourly feeds through the data pipelines.

A common trait of datasets is heavy initial usage. During this period the datasets are considered HOT. Based on our analysis, we found there is a definite decline in usage with time, where the stored data is accessed a few times a week and ages to being WARM data. In the next 90 days, when data usage falls to a few times a month, it is defined as COLD data.

So data can be considered HOT during its initial days, then it remains WARM in the first month. Jobs or applications use the data a few times during this period. The data’s usage goes down further; the data becomes COLD, and may be used only a handful of times in the next 90 days. Finally, when the data is very rarely used, at a frequency of once or twice per year, the “temperature” of the data is referred to as FROZEN.

 Data Age   Usage Frequency   Temperature 
Age < 7 days 20 times a day HOT
7 days > Age < 1 month 5 times a week WARM
1 month < Age < 3 months 5 times a month COLD
3 months < Age < 3 years 2 times a year FROZEN

In general, a temperature can be associated with each dataset. In this case, temperature is inversely proportional to the age of the data. Other factors can affect the temperature of a particular dataset. You can also write algorithms to determine the temperature of datasets.

Tiered storage in HDFS

HDFS supports tiered storage since Hadoop 2.3.

How does it work?

Normally, a machine is added to the cluster, and local file system directories are specified to store the block replicas. The parameter used to specify the local storage directories is dfs.datanode.data.dir. Another tier, such as ARCHIVE, can be added using an enum called StorageType. To denote that a local directory belongs to the ARCHIVE tier, the directory is prefixed in the configuration with [ARCHIVE]. In theory, multiple tiers can exist, as defined by a Hadoop cluster administrator.

For example: Let’s add 100 nodes that contain 200 TB of storage per node to an existing 1,000-node cluster having a total of 20 PB of storage. These new nodes have limited computing capability compared to the existing 1,000 nodes. Let’s prefix all the local data directories with ARCHIVE. These 100 nodes now form the ARCHIVE tier and can store 20 PB of data. The total capacity of the cluster is 40 PB, which is divided into two tiers – the DISK tier and the ARCHIVE tier. Each tier has 20 PB.

Mapping data to a storage tier based on temperature

For this example, we will store the heavily used HOT data in the DISK tier, which has nodes with better computing power.

For WARM data, we will keep most of its replicas in the DISK tier. For data with a replication factor of 3, we will keep two replicas in the DISK tier and one replica in the ARCHIVE tier.

If data is COLD, we will keep at least one replica of each block of the COLD data in the DISK tier. All the remaining replicas go to the ARCHIVE tier.

When a dataset is deemed FROZEN, which means it is almost never used, it is not optimal to store it on a node that has lots of CPU power to run many tasks or containers. We will keep it on a node that has minimal computing power. Thus, all the replicas of all blocks of FROZEN data can move to the ARCHIVE tier.

Data flow across tiers

When data is first added to the cluster, it gets stored in the default tier, DISK. Based on the temperature of the data, one or more replicas are moved to the ARCHIVE tier. Mover is used for data movement from one storage tier to another tier. Mover works similarly to Balancer except that it moves block replicas across tiers. Mover accepts an HDFS path, a replica count, and destination tier information. Then it identifies the replicas to be moved based on the tier information, and schedules the moves between source and destination data nodes.

Changes in Hadoop 2.6 to support tiered storage

Many improvements in Hadoop 2.6 further support tiered storage. You can attach a storage policy to a directory to denote it as HOT, WARM, COLD, or FROZEN. The storage policy defines the number of replicas to be located on each tier. It is possible to change the storage policy on a directory and then invoke Mover on that directory to make the policy effective.

Applications using data

Based on the data temperature, some or all replicas of data could be on either tier. But the location is transparent to applications consuming the data via HDFS.

Even though all the replicas of FROZEN data are on ARCHIVE storage, applications can still access it just like any HDFS data. Because no computing power is available on ARCHIVE nodes, mapped tasks running on DISK nodes will read the data from ARCHIVE nodes, which leads to increased network traffic for the applications. If this occurs too frequently, you can declare the data as WARM/COLD, and Mover can move one or more replicas back to DISK.

The determination of data temperature and the designated replica movement to pre-defined tiered storage can be fully automated.

Tiered storage at eBay

Tiered storage is enabled in one of the very large clusters at eBay. The cluster had 40 PB of data. We added 10 PB of additional storage with limited computing power. Each new machine could store 220 TB. We marked the additional storage as ARCHIVE. We identified a few directories as WARM, COLD, or FROZEN. Based on their temperature, we moved all or a few replicas to the ARCHIVE storage.

The price per GB of the ARCHIVE tier is four times less than the price per GB on the DISK tier. This difference is mainly because machines in the ARCHIVE tier have very limited computing power and hence lower costs.

Summary

Storage without computing is cheaper than storage with computing. We can use the temperature of the data to make sure that storage with computing is wisely used. Because each block of data is replicated a few times (the default is three), some replicas can be moved to the low-cost storage based on the temperature of the data. HDFS supports tiered storage and provides the necessary tools to move data between tiers. Tiered storage is enabled on one of the very large clusters at eBay to archive data.

Benoy Antony is an Apache Hadoop committer who focuses on HDFS and Hadoop security. Benoy works as a software engineer in the Global Data Infrastructure team at eBay.