Apache Eagle: Secure Hadoop in Real Time

Co-Authors: Chaitali Gupta and Edward Zhang

Update:  Eagle was accepted as an Apache Incubator project on October 26, 2015.

Today’s successful organizations are data driven. At eBay we have thousands of engineers, analysts, and data scientists who crunch petabytes of data everyday to provide a great experience for our users.  We execute at massive scale using data to connect our millions of users in global commerce.

In recent years, Hadoop has become one of the most popular choices for Big Data analytics. eBay uses Hadoop to generate value from data for improving search experience, identifying and optimizing relevant advertisements, enriching our product catalogs, and performing click stream analysis to understand how eBay customers leverage our marketplace. Accordingly, we have a wide variety of workloads that include Hive, MapReduce, Spark, HBase, and hundreds of petabytes of data.

In this era of Big Data, security is ever more critical. At eBay, Hadoop data is secured. Our security approach follows these four pillars: access control, perimeter security, data classification, and data activity monitoring. Early in our journey, we recognized there was no product or solution that adequately supported our data activity monitoring needs given the scale and variety of use cases at eBay. To address this gap, eBay built Eagle.

Eagle is an open-source Data Activity Monitoring solution for Hadoop to instantly detect access to sensitive data or malicious activities, and to take appropriate actions.

We believe Eagle is a core component of Hadoop data security, and we want to share this capability with the open-source community. We will be open sourcing Eagle through the Apache Software Foundation. We are looking forward to working with the open-source development community.

Here are some of the Eagle data activity monitoring use cases:

  • Anomalous data access detection based on user behavior
  • Discovery of intrusions and security breaches
  • Discovery and prevention of sensitive data loss
  • Policy-based detection and alerting

Key Eagle qualities include the following:

  • Real time: We understand the importance of timing and acting fast in case of a security breach. So we designed Eagle to make sure that the alerts are generated in a sub-second and that the anomalous activity is stopped if it’s a real threat.
  • Scalability: At eBay, Eagle is deployed on multiple large Hadoop clusters with petabytes of data and 800 million access events every day.
  • Ease of use: Usability is one of our core design principles. We have made it easy to get started. It takes only a few minutes to get up and running with Eagle sandbox, and examples and policies can be added with a few clicks.
  • User profiles: Eagle provides capabilities to create user profiles based on user behavior in Hadoop. We have out-of-the box machine-learning algorithms that you can leverage to build models with different HDFS features and get alerted on anomalies.
  • Open source: Eagle is built ground up using open-source standards and various products from the Big Data space. We decided to open-source Eagle to help the community, and we are looking forward to your feedback, collaboration, and support.
  • Extensibility: Eagle is designed with extensibility in mind. You can easily integrate Eagle with existing data classification and monitoring tools.

Eagle at a glance


1.a Eagle Architecture

1. Data collection

Eagle provides a programming API for extending Eagle to integrate any data source into the Eagle policy evaluation framework. For example, Eagle HDFS audit monitoring collects data from Kafka, which is populated from the NameNode log4j appender or from the logstash agent. Eagle Hive monitoring collects Hive query logs from running jobs through the YARN API, which is designed to be scalable and fault-tolerant.

2. Data processing

2.1 Stream processing API: Eagle provides a stream processing API which is an abstraction on Apache Storm, but is also extensible to other streaming engines. This abstraction allows developers to easily assemble data transformation, filtering, external data join, etc. without being physically bound to a specific streaming platform. The Eagle streaming API also allows developers to easily integrate business logic with the Eagle policy engine. Internally, the Eagle framework compiles business logic execution DAG into program primitives of the underlying stream infrastructure—for example, Apache Storm.

Here is an example of events and alerts processing in Eagle:

StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config); // storm
StreamProducer producer = env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1) // declare kafka source
       .flatMap(new AuditLogTransformer()) //transform event
       .groupBy(Arrays.asList(0))  //group by 1st field
       .flatMap(new UserProfileAggregatorExecutor()); //aggregate one-hour data by user
       .alertWithConsumer(“userActivity“,”userProfileExecutor“) // ML policy evaluate
env.execute(); // execute stream processing and alert

2.2 Alerting framework: The Eagle alerting framework includes a stream metadata API, a policy engine provider API for extensibility, and a policy partitioner interface for scalability.

The stream metadata API allows developers to declare the event schema including what attributes constitute an event, what each attribute’s type is, and how to dynamically resolve attribute values at runtime when the user configures a policy.

The policy engine provider API allows developers to plug in a new policy engine easily. The WSO2 Siddhi CEP engine is the policy engine that Eagle supports as a first-class citizen. A machine-learning algorithm is also wrapped into the framework as one type of policy engine.

Eagle’s extensible interface allows you to plug in different policy engines:

public interface PolicyEvaluatorServiceProvider {
  public String getPolicyType(); // literal string to identify one type of policy
  public Class getPolicyEvaluator(); // get policy evaluator implementation
  public List getBindingModules(); // policy text with json format to object mapping
public interface PolicyEvaluator {
 public void evaluate(ValuesArray input) throws Exception; // evaluate input event
 public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef);// policy update
  public void onPolicyDelete(); // invoked when policy is deleted

The policy partitioner interface allows policies to be executed on different physical nodes in parallel. It also allows you to define your own policy partitioner class. These capabilities enable policy and event evaluation in a fully distributed fashion.

Scalable eagle policy execution framework

1.b Scalable Eagle policy execution framework

Scalability: Eagle supports policy partitioning to support a large number of policies.

public interface PolicyPartitioner extends Serializable {
    // method to distribute policies  
    int partition(int numTotalPartitions, String policyType, String policyId); 

2.3 Machine-learning module:  Eagle provides capabilities to define user activity patterns or user profiles for Hadoop users based on the user behavior in the platform. The idea is to provide anomaly detection capability without setting hard thresholds in the system. The user profiles generated by our system are modeled using machine-learning algorithms and used for detection of anomalous user activities, where users’ activity pattern differs from their pattern history. Currently Eagle uses two algorithms for anomaly detection: Eigen-Value Decomposition and Density Estimation. The algorithms read data from HDFS audit logs, slice and dice data, and generate models for each user in the system. Once models are generated, Eagle uses the Storm framework for near-real-time anomaly detection to determine if current user activities are suspicious or not with respect to their model. The block diagram below shows the current pipeline for user profile training and online detection.

User profile offline training & Anomaly detection architecture

1.c User profile offline training & anomaly detection architecture

Eagle online anomaly detection uses the Eagle policy framework, and the user profile is defined as one of the policies in the system. The user profile policy is evaluated by a machine-learning evaluator extended from the Eagle policy evaluator. Policy definition includes the features that are needed for anomaly detection (same as the ones used for training purposes).

A scheduler runs a Spark-based offline training program (to generate user profiles or models) at a configurable time interval; currently, the training program generates new models once every month.

The following are some details on the algorithms.

2.3.1 Density Estimation—In this algorithm, the idea is to evaluate, for each user, a probability density function from the observed training data sample. We mean-normalize a training dataset for each feature. Normalization allows datasets to be on the same scale. In our probability density estimation, we use a Gaussian distribution function as the method for computing probability density. Features are conditionally independent of one another; therefore, the final Gaussian probability density can be computed by factorizing each feature’s probability density. During the online detection phase, we compute the probability of a user’s activity. If the probability of the user performing the activity is below threshold (determined from the training program, using a method called Mathews Correlation Coefficient), we signal anomaly alerts.

1.d Showing user behavior histogram on one dimension

2.3.2 Eigen-Value Decomposition—Our goal in user profile generation is to find interesting behavioral patterns for users. One way to achieve that goal is to consider a combination of features and see how each one influences the others. When the data volume is large, which is generally the case for us, abnormal patterns among features may go unnoticed due to the huge number of normal patterns. As normal behavioral patterns can lie within very low-dimensional subspace, we can potentially reduce the dimension of the dataset to better understand the user behavior pattern. This method also reduces noise, if any, in the training dataset. Based on the amount of variance of the data we maintain for a user, which is usually 95% for our case, we seek to find the number of principal components k that represents 95% variance. We consider first k principal components as normal subspace for the user. The remaining (n-k) principal components are considered as abnormal subspace.

During online anomaly detection, if the user behavior lies near normal subspace, we consider the behavior to be normal. On the other hand, if the user behavior lies near the abnormal subspace, we raise an alarm as we believe usual user behavior should generally fall within normal subspace. We use the Euclidian distance method to compute whether a user’s current activity is near normal or abnormal subspace.

1.e Showing important user behavior components

3. Eagle services

3.1 Policy Manager: Eagle Policy Manager provides a UI and Restful API for users to define policies. The Eagle user interface makes it easy to manage policies with a few clicks, mark or import sensitivity metadata, perform HDFS or Hive resource browsing, access alert dashboards, etc.

Here is a single-event evaluation policy (one user accessing a sensitive column in Hive):

from hiveAccessStream[sensitivityType=='PHONE_NUM'] select * insert into outputStream;

Here is a window-based policy (one user accessing /tmp/private 5 times or more within 10 minutes):

hdfsAuditLogEventStream[(src == '/tmp/private')]#window.externalTime(timestamp,10 min) select user, count(timestamp) as aggValue group by user having aggValue >= 5 
insert into outputStream;

3.2 Query Service: Eagle provides a SQL-like service API to support comprehensive computation for huge sets of data—comprehensive filtering, aggregation, histogram, sorting, top, arithmetical expression, pagination, etc. Although Eagle supports HBase for data storage as a first-class citizen, a relational database is supported as well. For HBase storage, the Eagle query framework compiles a user-provided SQL-like query into HBase native filter objects, and then executes it through the HBase coprocessor on the fly.


Eagle at eBay

Eagle data activity monitoring is currently being used for monitoring the data access activities in a 2500-node Hadoop cluster, with plans to extend it to other Hadoop clusters covering 10,000 nodes by the end of this year. We started with a basic set of policies on HDFS/Hive data and will be ramping up more policies by the end of this year. Our policies range from access patterns, commonly accessed data sets, predefined queries, Hive tables, columns, and HBase tables, through to policies based on user profiles generated by ML models. We have a wide range of policies to stop data loss, data copying to unsecured location, sensitive data access from unauthorized zones, etc. The flexibility of creating policies in Eagle allows us to expand further and add more complex policies.

What’s next

In addition to data activity monitoring, at eBay the Eagle framework is used extensively to monitor the health of nodes, Hadoop apps, core services, and the entire Hadoop cluster. We have also built in a lot of automation around remediation of nodes, which helped us reduce our manual workload to a large extent.

Below are some of the features we are currently working on and will be releasing in the next version:

  • Machine-learning models for Hive and HBase data access events
  • Extensible API for integration with external tools for reporting and data classification
  • New module for Hadoop cluster monitoring in the next version of Eagle
    • HBase access monitoring
    • Hadoop job performance monitoring
    • Hadoop node monitoring

Please visit https://github.com/eBay/Eagle for more information. 

GZinga: Seekable and Splittable Gzip

Co-Author:  Mahesh Somani

Generally, data compression techniques are used to conserve space and network bandwidth. Widely used compression techniques include Gzip, bzip2, lzop, and 7-Zip. According to performance benchmarks, lzop is one of the fastest compression algorithms, while bzip2 has a high compression ratio but is very slow. Gzip offers the lowest level of compression. Gzip is based on the DEFLATE algorithm, which is a combination of LZ77 and Huffman coding. The zlib software library writes compressed data with a Gzip wrapper. (Note that in this post we will use Gzip and zlib interchangeably.) With Gzip, compression is as fast as or faster than serial writes on disk, and the compression ratio is far better than with lzop. For decompression as well, Gzip performs better than other algorithms.

These are the reasons why Gzip is one of the most popular and widely used data compression techniques in the industry. However, Gzip has limitations: you cannot randomly access a Gzip file, and you cannot split a Gzip file in the case of Hadoop map-reduce jobs. As a result, Gzip is slower in those scenarios, and it does not leverage the power of parallel processing.

The eBay Cloud Platform team is happy to announce the open-source project GZinga, which aims to provide two additional capabilities for Gzip-compressed files:

  • Seekable: Random search within Gzip files
  • Splittable: Division of a Gzip file into multiple chunks


It is common to collect logs from production applications and use them to debug and triage issues. Log files are generally rolled over periodically (hourly, daily) or based on size. To save disk space, log files are stored in a compressed format such as Gzip.

In most outage scenarios, developers are interested in looking at logs for a particular activity or around a certain timestamp. Scanning an entire file to find the log for a particular time period is costly. The data for these logs can be stored on commodity storage like Hadoop. However, to take advantage of Hadoop’s capabilities, small chunks of large Gzip files need to be processed in parallel (for files beyond a few hundred MBs).

The idea of GZinga originated as a means to provide optimal performance for reading/processing Gzip files. Though we have looked at options for log files, the technique we’ve developed can apply to other use cases—textual documents, web crawls, weather data, etc.

Seekable Gzip: write

In this section, we will discuss generating Gzip files with meta information that will be used to perform random access in those files. We use two techniques from the Gzip specification—multiple compressed blocks and the Gzip header flags format—to make files seekable.

Multiple compressed blocks

A Gzip file consists of a series of “members” (compressed data sets). The format of each member is specified as follows:

   <Header>Compressed Data<Footer>

As a Gzip file can have a series of such members, an actual Gzip file can look like this:

   <Header>Compressed Data<Footer><Header>Compressed Data</Footer>….

Such multiple compressed blocks can be achieved using the Z_FULL_FLUSH option, according to the zlib manual. These blocks help us in reading a file from any arbitrary location after jumping to its inner header location. The main advantages of this feature are that it is not necessary to uncompress the entire file and it is possible to read from different locations. Only the requested compressed block will be uncompressed, which improves performance considerably.

As the deflater gets reset for every block, compression efficiency (both size and processing) diminishes. However, we’ve observed that when data is compressed using zlib, then the impact on the amount of compression is minimal. The following table shows the difference in compression and timing for logs, with and without the multiple-blocks approach. In all of these scenarios, 60 blocks are written—1 per minute for an hourly log file.

Original file size (in MB) Time taken with compressed blocks Time taken without compressed blocks Δ increase in time File size with compressed blocks (in MB) File size without compressed blocks (in MB) % increase in file size
8.12 0.41 0.387 0.023 1.078 1.076 0.19
105.5 3.685 3.49 0.195 13.982 13.981 0.007
527.86 14.54 14.12 0.420 69.904 69.903 0.001
2111.47 53.44 52.6 0.840 279.614 279.6 0.0005

It is evident that when the size per block is greater than 1 MB, the processing and space overhead is minimal and can be ignored. (Note that a smaller block size may not be suitable to use, as it is reasonably fast to decompress such blocks).

Header flags format

The header section has provision for extra comments, according to the Gzip specification. Each header has the following format :

|ID1|ID2|CM |FLG|     MTIME     |XFL|OS | (more-->)
  1   2   3   4   5   6   7   8   9   10

The 4th byte of the header represents the flag information:

bit 0   FTEXT
bit 1   FHCRC
bit 2   FEXTRA
bit 3   FNAME
bit 4   FCOMMENT
bit 5   reserved
bit 6   reserved
bit 7   reserved

Bit 4 can store a zero-terminated file comment at the end of the block. We can use this flag to store an index for the file. The interface for providing the index is shown below in the GZipOutputStreamRandomAccess class :

class GZipOutputStreamRandomAccess extends DeflaterOutputStream {
private Map<Long, Long> offsetMap = new LinkedHashMap<Long, Long>(); //will maintain index where value provides byte offset for given key

/** This method adds current byte offset (in gzip file) for given key.*/
public void addOffset(Long key) {

/**Writes header with extra comment which contains entries from offsetMap.*/
private void writeHeaderWithComment() {

The comment section in the header field will contain information about the index collected up until that point. It should be noted that comment blocks are ignored by standard libraries that don’t look for comments or extra fields. The comments are added as a name / value pair as shown below:

|ID1|ID2|CM |FLG|     MTIME     |XFL|OS | key1:value1;key2:value2;........;0|
  1   2   3   4   5   6   7   8   9   10  Index in specific format ending with 0

At the time of the file close operation, an extra block is added with a header that contains information about all indices, and a footer without any data. This extra block can be used in locating the header deterministically while reading, as described in the next section. The effective size of the file increases due to the comment block as described below (in a test with 60 blocks):

Original file size (in MB) Time taken with index data(in ms) Time taken without index data (in ms) Δ increase in time File size with index data (in MB) File size without index data (in MB) % increase in file size
8.12 0.582 0.387 0.195 1.154 1.08 6.85
105.5 3.926 3.49 0.436 14.09 13.98 0.79
527.86 15 14.12 0.880 70.02 69.9 0.17
2111.47 54.35 52.6 1.750 279.72 279.6 0.04

Seekable Gzip: read

To take advantage of an index written in the header, the file will first be open for random access and the last few chunks of data will be read. Then the header needs to be located and the index loaded into memory. Byte comparison (reading in reverse order) will be employed to locate the header, and then the index information will be extracted. Based on the requested index, the read marker will be jumped to the required location and the stream will be passed to Gzip for uncompression of the data. The interface for jumping to the desired location and reading the metadata information is shown in the below GZipInputStreamRandomAccess class:

class GZipInputStreamRandomAccess extends GZiPInputStream {

/** Return metadata information for given file.*/

public Map<Long, Long> getMetadata();

/** This method jump to location for specified key. If specified key does not exist, then it jumps to beginning of file.*/

public void jumpToIndex(Long index) throws IOException;


Reading the index and then jumping to the desired location improves performance dramatically. The following table provides the numbers for jumping to the last block of data and reading one record from there:

Original file size (in MB) Compressed file size (in MB) Time taken with index data (in sec) Time taken without index data (in sec)
8.12 1.08 0.162 0.266
105.5 13.98 0.166 0.87
527.86 69.9 0.167 3.59
2111.47 279.6 0.17 13.71

We also observed that it takes the same amount of time irrespective of which index needs to be jumped to before reading one record. Overall, this approach provides significant improvement in performance, with the extent of improvement depending on file size (e.g., 10-50x gain).

Random access in Hadoop

SeekableGZipDataInputStream is an extension of Hadoop’s FSDataInputStream class to enable random access for Gzip files that are stored inside Hadoop. Using this extended class, one can jump to any location in a Gzip file stored in Hadoop and read required data from there with much faster performance. Here is an example of using this class to perform random access:

FSDataInputStream fin = fs.open(new Path("testfile"));
long len = fs.getFileStatus(new Path("testfile")).getLen();
SeekableGZipDataInputStream sin = new SeekableGZipDataInputStream(fin, len);
GZipInputStreamRandomAccess gzin = new GZipInputStreamRandomAccess(sin);

Splittable Gzip

In many Hadoop production environments, you get Gzipped files as the raw input. When putting these Gzipped files into Hadoop, an MR job runs with only 1 map task, as a Gzip file is not splittable. This fact is the biggest disadvantage to Gzip, because it defeats the real power of Hadoop. But when we generate Gzip files with the above Seekable approach, then it is possible to split a Gzip file and feed it to an individual map task for processing.

Determining the split

A Gzip file with multiple compressed blocks allows for splitting the file based on the start of a new header. In our approach, we implemented a SplittableCompressionCodec interface for the GzipCodec class. Using this interface, when a split is being invoked with the “start” and “end” position of the split provided, it locates the start of the header from the provided “start” (and “end”) position and sets the new “start” (and “end”) pointer once located. For example, given the following split as the original (or default):


The new “start” and “end” location will be set as shown below:


Compressed data before the “start” position will be processed by the previous split (both the “start” and “end” positions will point to the next header).

Configuration changes

In order to use this feature, one needs to set the io.compression.codec property in the configuration object before submitting the MR job. Instead of org.apache.hadoop.io.compress.GzipCodec for Gzip files, the value should be io.gzinga.hadoop.SplittableGzipCodec:


This value can also be set in the mapred-site.xml file to take effect for all MR jobs.

Another important property to set is for splitting the size. Property mapreduce.input.fileinputformat.split.maxsize indicates the maximum split size. The number of splits will depend upon the value of this property and the actual Gzip file size. The greater the number of splits, the faster the MR job. Below are performance numbers indicating improvement with respect to splits (where the input Gzip file size is 280MB and the MR job is to perform the wc command on the file):

Split Size (MB) # of Splits Avg Map Time (sec) Job Time (sec)
32 9 24 50
64 5 36 55
128 3 52 79
300 1 127 139

As these performance numbers show, the map task took 127 seconds when run with a single split, compared to 52 seconds when run with 3 map tasks. Also note that when the split size decreases (and the number of splits increases), the map task time decreases accordingly.


Gzip provides one of the best compression algorithms and is widely used in the industry. But with its lack of support for random access search and file splitting, it is not able to leverage the power of parallel and distributed processing systems like Hadoop. With the introduction of the Seekable and Splittable features, Hadoop access can be achieved with high performance.

Company-wide Hack Week Encourages Innovation

What happens when you give more than 2500 employees an entire week to develop an innovation concept they are passionate about? Amazing things happen.

Last month, eBay Chief Product Officer, RJ Pittman, and eBay Chief Technology Officer, Steve Fisher, piloted a new program, Hack Week, that gave everyone in their organization an entire week to design, develop, and deliver a new innovation concept.

In a bold move, we adjusted our roadmap commitments to give our innovators a dedicated week to focus on the ideas that inspire them and create innovative concepts that deliver value for our customers. From our interns, to new hires fresh out of college, to design experts, to data researchers, to our most seasoned programmers, everyone came together in the spirit of innovation.

More than 400 concepts were submitted and involved 7 major eBay offices around the world. Concepts included wearables, on-demand manufacturing, digital goods, computer vision, cryptocurrencies, security, natural language processing, machine learning, voice recognition, search science, and more. Innovations spanned product marketing, policy, business process, data insights, and technology infrastructure. Add in exciting activities like dueling drones, battling robots, and virtual reality gaming, and the energy on the campus was buzzing with excitement.

One of our innovators, Robinson Piramuthu, shared his experience:

An event like Hack Week is like a marathon and tests our perseverance and rapid coding. We were allowed to choose the topics that inspired us, to be entrepreneurs driving the ideas that will transform eBay.

Our project involves computer vision, which is an emerging topic at eBay with still-developing resources and infrastructure. The challenge is how to put a complex system together in one week, from the UI to the back-end servers. Prior to hack week, a lot of planning happened mentally – picturing how the product will look like, what features it will support, which features exist (in some other form), and which needed to be developed in short time.

During Hack Week there were some organic unexpected developments that involved code optimization, UI simplification, and believe it or not, algorithm development. The most difficult day was the day before the final day. We encountered some problems with the back-end server that we postponed since we were focusing a lot on the front end. But it was a treat when we fixed the problem.

It was great to have full support from our eBay leaders during Hack Week. The continued focus without interruptions made it very productive.

Each location recognized its innovators with local awards, and several concepts are already underway having picked up executive sponsors during Hack Week demos. The best from each location will now move on to be reviewed by the executive leadership team to get their sponsorship in pursuing some of the larger, cross-domain innovation concepts.

Events like Hack Week, which are part of eBay’s larger Innovation Programs, are critical drivers for our continuing cultural transformation at eBay. It inspires our talent, helps them develop new skills, and gives them a channel for their voices to be heard. Driven by our purpose of delivering connected commerce and creating economic opportunity, our mandate to innovate to help our customers be successful is stronger than it has ever been.