Author Archives: Ruchir Shah

embedded-druid: Leveraging Druid Capabilities in Stand-alone Applications

Co-authors:  Ramachandran Ramesh, Mahesh Somani, and Sankar Venkatraman

The eBay Cloud Platform team is happy to announce the open-source project embedded-druid, which aims to provide Druid capability for a reasonably small amount of data without involving the complexity of multi-node setup. That is, embedded-druid is Druid but with a single JVM process.

Background

Druid is an open-source data store designed for real-time exploratory analytics on large data sets. Combining a column-oriented storage layout, a distributed, shared-nothing architecture, and an advanced indexing structure, Druid allows for the arbitrary exploration of billion-row tables with sub-second latencies. It also supports fast aggregations and OLAP queries. Druid is proven technology for executing OLAP kinds of queries involving Big Data, and for providing sub-second response times.

Motivation

Given its distributed, shared-nothing architecture for large amounts of data, Druid has multiple components: real-time node, historical node, broker node, co-ordinator node, deep storage, MySQL, ZooKeeper, etc. If the input data size is small (say, up to hundreds of millions of rows), then the overhead involved in deploying Druid might be excessive; one might prefer to use in-memory database systems like Apache Derby or PostgreSQL if the report requirement is very simple (such as grouping by some dimension or retrieving topN values). There are numerous use cases where the input is not “Big Data” but rather medium or small data requiring OLAP-like capabilities, such as grouping by multiple dimensions and handling percentile and other aggregation functions.

For example, in eBay we generate operational metrics reports for applications that run on multiple machines across data centers. This report contains total request counts, average request durations, etc. across different dimensions, such as the type of request, data center, request status, dependency, etc. Each application owner might view different types of information from this report – top hosts with errors, slowest requests by request type / data center, or requests by error code, as just a few examples. Given the dynamic nature of such queries, if Druid capabilities can be leveraged without deployment complexity, then the lives of developers, debuggers, and analysts can be made much easier.

embedded-druid in action

Let us assume a simple database that represents the number of characters added for a particular page in Wikipedia. We have the following columns representing the dimensions for our data:

Timestamp, Page, Username, Gender, City

Here is the metric we are interested in:

CharsAdded

The following table shows sample data for the above schema:

Timestamp Page Username Gender City CharsAdded
1234567 JB Abc Male SF 1290
1234576 JB Xyz Female SJ 3421
1234687 AB Qwe Male LA 2345
1234789 AB Def Female LV 1234

We want to generate different reports, such as “How many edits were made on page AB grouped by gender?” or “What is the histogram for edits made on page JB in SF?” The following sections walk through how to generate such reports using embedded-druid.

Creating the loader

Currently, embedded-druid supports loading CSV files, for which the implementation class CSVLoader is available. One needs to first provide a list of all columns available in the CSV file (including metrics), a list of dimensions, and a column specifying the timestamp (if available). For example, for the Wikipedia schema mentioned above, the CSV file might have data in this format:

Timestamp, Page, Username, Gender, City, metric, value

The following code creates the Loader object required to load this data in memory:

List<String> columns = Arrays.asList("Timestamp", "Page", "Username", "Gender", "City", “metric”, “value”);

    List<String> metrics = Arrays.asList("value");
    List<String> dimensions = new ArrayList<String>(columns);
    dimensions.removeAll(metrics);

    Loader loader = new CSVLoader(reader, columns, dimensions, "Timestamp");

Creating Druid segment/index files

Druid generates segment and index files for the given input. Once the Loader object is created, one needs to specify segment and index files required for query purposes. This specification includes the available dimensions and the kind of aggregator function to be used for querying. For example, if one is interested in querying values like total count, max, min, total sum, and percentiles, then the following AggregatorFactory objects need to be created:

DimensionsSpec dimensionsSpec = new DimensionsSpec(dimensions, null, null);
AggregatorFactory[] metricsAgg = new AggregatorFactory[] {
        new LongSumAggregatorFactory("agg_count", "count"),
        new MaxAggregatorFactory("agg_max", "max"),
        new MinAggregatorFactory("agg_min", "min"),
        new DoubleSumAggregatorFactory("agg_sum", "sum"),
        new ApproximateHistogramAggregatorFactory("agg_histogram", "value", null, null, null, null)
    };

To create segment and index files locally, one needs to create a QueryableIndex object as follows:

IncrementalIndexSchema indexSchema = new IncrementalIndexSchema(0, QueryGranularity.ALL, dimensionsSpec, metricsAgg);
QueryableIndex index = IndexHelper.getQueryableIndex(loader, indexSchema);

By default, segment files are created at the location System.getProperty("druid.segment.dir"). If this property is not set, then the files will be created at the temporary location System.getProperty("java.io.tmpdir") + File.separator + "druid-tmp-index-". Therefore, if one wants to create segment files at a specified location, then  property “druid.segment.dir” needs to be set first.

Querying data

Once segment files are created, one can execute different kinds of queries using the QueryableIndex object. For example, if one wants to execute GroupByQuery for the above mentioned schema, then the code looks like this:

List<DimFilter> filters = new ArrayList<DimFilter>();
filters.add(DimFilters.dimEquals("Page", "JB"));
filters.add(DimFilters.dimEquals("Gender", "Male"));
filters.add(DimFilters.dimEquals("metric", "CharsAdded"));

GroupByQuery query = GroupByQuery.builder()
        .setDataSource("test")
        .setQuerySegmentSpec(QuerySegmentSpecs.create(new Interval(0, new DateTime().getMillis())))
        .setGranularity(QueryGranularity.NONE)
        .addDimension("City")
        .addAggregator(new LongSumAggregatorFactory("agg_count", "agg_count"))
        .addAggregator(new MaxAggregatorFactory("agg_max", "agg_max"))
        .addAggregator(new MinAggregatorFactory("agg_min", "agg_min"))
        .addAggregator(new DoubleSumAggregatorFactory("agg_sum", "agg_sum"))
        .addAggregator(new ApproximateHistogramFoldingAggregatorFactory("agg_histogram", "agg_histogram", 20, 5, null, null))
        .addPostAggregator(new QuantilesPostAggregator("agg_quantiles", "agg_histogram", new float[] {0.25f, 0.5f, 0.75f, 0.95f, 0.99f}))
    .setFilter(DimFilters.and(filters))
    .build();
Sequence<Row> sequence = QueryHelper.run(query, index);
    ArrayList<Row> results = Sequences.toList(sequence, Lists.<Row>newArrayList());

Similarly, here is the code snippet for executing TopNQuery:

List<DimFilter> filters = new ArrayList<DimFilter>();
filters.add(DimFilters.dimEquals("Page", "JB"));
filters.add(DimFilters.dimEquals("Gender", "Male"));
filters.add(DimFilters.dimEquals("metric", "CharsAdded"));

TopNQuery query =
        new TopNQueryBuilder()
            .threshold(5)
            .metric("agg_count")
            .dataSource("test")
            .intervals(QuerySegmentSpecs.create(new Interval(0, new DateTime().getMillis())))
            .granularity(QueryGranularity.NONE)
            .dimension("City")
            .aggregators(
                Arrays.<AggregatorFactory>asList(
                    new LongSumAggregatorFactory("agg_count", "agg_count"),
                    new MaxAggregatorFactory("agg_max", "agg_max"),
                    new MinAggregatorFactory("agg_min", "agg_min"),
                    new DoubleSumAggregatorFactory("agg_sum", "agg_sum"))
            .filters(DimFilters.and(filters))
            . build();

Sequence<Result> sequence = QueryHelper.run(query, index);
    ArrayList<Result> results = Sequences.toList(sequence, Lists.<Result>newArrayList());

Future work

We are planning to extend this work by providing (and/or integrating) REST APIs for ingestion and for querying Druid data. For visualization, we also plan to integrate with an easy-to-use UI like Grafana. These enhancements will help users analyze data quickly and surface meaningful information promptly.

GZinga: Seekable and Splittable Gzip

 

Generally, data compression techniques are used to conserve space and network bandwidth. Widely used compression techniques include Gzip, bzip2, lzop, and 7-Zip. According to performance benchmarks, lzop is one of the fastest compression algorithms, while bzip2 has a high compression ratio but is very slow. Gzip offers the lowest level of compression. Gzip is based on the DEFLATE algorithm, which is a combination of LZ77 and Huffman coding. The zlib software library writes compressed data with a Gzip wrapper. (Note that in this post we will use Gzip and zlib interchangeably.) With Gzip, compression is as fast as or faster than serial writes on disk, and the compression ratio is far better than with lzop. For decompression as well, Gzip performs better than other algorithms.

These are the reasons why Gzip is one of the most popular and widely used data compression techniques in the industry. However, Gzip has limitations: you cannot randomly access a Gzip file, and you cannot split a Gzip file in the case of Hadoop map-reduce jobs. As a result, Gzip is slower in those scenarios, and it does not leverage the power of parallel processing.

The eBay Cloud Platform team is happy to announce the open-source project GZinga, which aims to provide two additional capabilities for Gzip-compressed files:

  • Seekable: Random search within Gzip files
  • Splittable: Division of a Gzip file into multiple chunks

Motivation

It is common to collect logs from production applications and use them to debug and triage issues. Log files are generally rolled over periodically (hourly, daily) or based on size. To save disk space, log files are stored in a compressed format such as Gzip.

In most outage scenarios, developers are interested in looking at logs for a particular activity or around a certain timestamp. Scanning an entire file to find the log for a particular time period is costly. The data for these logs can be stored on commodity storage like Hadoop. However, to take advantage of Hadoop’s capabilities, small chunks of large Gzip files need to be processed in parallel (for files beyond a few hundred MBs).

The idea of GZinga originated as a means to provide optimal performance for reading/processing Gzip files. Though we have looked at options for log files, the technique we’ve developed can apply to other use cases—textual documents, web crawls, weather data, etc.

Seekable Gzip: write

In this section, we will discuss generating Gzip files with meta information that will be used to perform random access in those files. We use two techniques from the Gzip specification—multiple compressed blocks and the Gzip header flags format—to make files seekable.

Multiple compressed blocks

A Gzip file consists of a series of “members” (compressed data sets). The format of each member is specified as follows:

   <Header>Compressed Data<Footer>

As a Gzip file can have a series of such members, an actual Gzip file can look like this:

   <Header>Compressed Data<Footer><Header>Compressed Data</Footer>….

Such multiple compressed blocks can be achieved using the Z_FULL_FLUSH option, according to the zlib manual. These blocks help us in reading a file from any arbitrary location after jumping to its inner header location. The main advantages of this feature are that it is not necessary to uncompress the entire file and it is possible to read from different locations. Only the requested compressed block will be uncompressed, which improves performance considerably.

As the deflater gets reset for every block, compression efficiency (both size and processing) diminishes. However, we’ve observed that when data is compressed using zlib, then the impact on the amount of compression is minimal. The following table shows the difference in compression and timing for logs, with and without the multiple-blocks approach. In all of these scenarios, 60 blocks are written—1 per minute for an hourly log file.

Original file size (in MB) Time taken with compressed blocks Time taken without compressed blocks Δ increase in time File size with compressed blocks (in MB) File size without compressed blocks (in MB) % increase in file size
8.12 0.41 0.387 0.023 1.078 1.076 0.19
105.5 3.685 3.49 0.195 13.982 13.981 0.007
527.86 14.54 14.12 0.420 69.904 69.903 0.001
2111.47 53.44 52.6 0.840 279.614 279.6 0.0005

It is evident that when the size per block is greater than 1 MB, the processing and space overhead is minimal and can be ignored. (Note that a smaller block size may not be suitable to use, as it is reasonably fast to decompress such blocks).

Header flags format

The header section has provision for extra comments, according to the Gzip specification. Each header has the following format :

+---+---+---+---+---+---+---+---+---+---+
|ID1|ID2|CM |FLG|     MTIME     |XFL|OS | (more-->)
+---+---+---+---+---+---+---+---+---+---+
  1   2   3   4   5   6   7   8   9   10

The 4th byte of the header represents the flag information:

bit 0   FTEXT
bit 1   FHCRC
bit 2   FEXTRA
bit 3   FNAME
bit 4   FCOMMENT
bit 5   reserved
bit 6   reserved
bit 7   reserved

Bit 4 can store a zero-terminated file comment at the end of the block. We can use this flag to store an index for the file. The interface for providing the index is shown below in the GZipOutputStreamRandomAccess class :

class GZipOutputStreamRandomAccess extends DeflaterOutputStream {
private Map<Long, Long> offsetMap = new LinkedHashMap<Long, Long>(); //will maintain index where value provides byte offset for given key

/** This method adds current byte offset (in gzip file) for given key.*/
public void addOffset(Long key) {
    }

/**Writes header with extra comment which contains entries from offsetMap.*/
private void writeHeaderWithComment() {
    }
}

The comment section in the header field will contain information about the index collected up until that point. It should be noted that comment blocks are ignored by standard libraries that don’t look for comments or extra fields. The comments are added as a name / value pair as shown below:

+---+---+---+---+---------------+---+---+-----------------------------------+
|ID1|ID2|CM |FLG|     MTIME     |XFL|OS | key1:value1;key2:value2;........;0|
+---+---+---+---+---+---+---+---+---+---+-----------------------------------+
  1   2   3   4   5   6   7   8   9   10  Index in specific format ending with 0

At the time of the file close operation, an extra block is added with a header that contains information about all indices, and a footer without any data. This extra block can be used in locating the header deterministically while reading, as described in the next section. The effective size of the file increases due to the comment block as described below (in a test with 60 blocks):

Original file size (in MB) Time taken with index data(in ms) Time taken without index data (in ms) Δ increase in time File size with index data (in MB) File size without index data (in MB) % increase in file size
8.12 0.582 0.387 0.195 1.154 1.08 6.85
105.5 3.926 3.49 0.436 14.09 13.98 0.79
527.86 15 14.12 0.880 70.02 69.9 0.17
2111.47 54.35 52.6 1.750 279.72 279.6 0.04

Seekable Gzip: read

To take advantage of an index written in the header, the file will first be open for random access and the last few chunks of data will be read. Then the header needs to be located and the index loaded into memory. Byte comparison (reading in reverse order) will be employed to locate the header, and then the index information will be extracted. Based on the requested index, the read marker will be jumped to the required location and the stream will be passed to Gzip for uncompression of the data. The interface for jumping to the desired location and reading the metadata information is shown in the below GZipInputStreamRandomAccess class:

class GZipInputStreamRandomAccess extends GZiPInputStream {


/** Return metadata information for given file.*/

public Map<Long, Long> getMetadata();


/** This method jump to location for specified key. If specified key does not exist, then it jumps to beginning of file.*/

public void jumpToIndex(Long index) throws IOException;

}

Reading the index and then jumping to the desired location improves performance dramatically. The following table provides the numbers for jumping to the last block of data and reading one record from there:

Original file size (in MB) Compressed file size (in MB) Time taken with index data (in sec) Time taken without index data (in sec)
8.12 1.08 0.162 0.266
105.5 13.98 0.166 0.87
527.86 69.9 0.167 3.59
2111.47 279.6 0.17 13.71

We also observed that it takes the same amount of time irrespective of which index needs to be jumped to before reading one record. Overall, this approach provides significant improvement in performance, with the extent of improvement depending on file size (e.g., 10-50x gain).

Random access in Hadoop

SeekableGZipDataInputStream is an extension of Hadoop’s FSDataInputStream class to enable random access for Gzip files that are stored inside Hadoop. Using this extended class, one can jump to any location in a Gzip file stored in Hadoop and read required data from there with much faster performance. Here is an example of using this class to perform random access:

FSDataInputStream fin = fs.open(new Path("testfile"));
long len = fs.getFileStatus(new Path("testfile")).getLen();
SeekableGZipDataInputStream sin = new SeekableGZipDataInputStream(fin, len);
GZipInputStreamRandomAccess gzin = new GZipInputStreamRandomAccess(sin);

Splittable Gzip

In many Hadoop production environments, you get Gzipped files as the raw input. When putting these Gzipped files into Hadoop, an MR job runs with only 1 map task, as a Gzip file is not splittable. This fact is the biggest disadvantage to Gzip, because it defeats the real power of Hadoop. But when we generate Gzip files with the above Seekable approach, then it is possible to split a Gzip file and feed it to an individual map task for processing.

Determining the split

A Gzip file with multiple compressed blocks allows for splitting the file based on the start of a new header. In our approach, we implemented a SplittableCompressionCodec interface for the GzipCodec class. Using this interface, when a split is being invoked with the “start” and “end” position of the split provided, it locates the start of the header from the provided “start” (and “end”) position and sets the new “start” (and “end”) pointer once located. For example, given the following split as the original (or default):

gzinga

The new “start” and “end” location will be set as shown below:

gzinga2

Compressed data before the “start” position will be processed by the previous split (both the “start” and “end” positions will point to the next header).

Configuration changes

In order to use this feature, one needs to set the io.compression.codec property in the configuration object before submitting the MR job. Instead of org.apache.hadoop.io.compress.GzipCodec for Gzip files, the value should be io.gzinga.hadoop.SplittableGzipCodec:

io.gzinga.hadoop.SplittableGzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec

This value can also be set in the mapred-site.xml file to take effect for all MR jobs.

Another important property to set is for splitting the size. Property mapreduce.input.fileinputformat.split.maxsize indicates the maximum split size. The number of splits will depend upon the value of this property and the actual Gzip file size. The greater the number of splits, the faster the MR job. Below are performance numbers indicating improvement with respect to splits (where the input Gzip file size is 280MB and the MR job is to perform the wc command on the file):

Split Size (MB) # of Splits Avg Map Time (sec) Job Time (sec)
32 9 24 50
64 5 36 55
128 3 52 79
300 1 127 139

As these performance numbers show, the map task took 127 seconds when run with a single split, compared to 52 seconds when run with 3 map tasks. Also note that when the split size decreases (and the number of splits increases), the map task time decreases accordingly.

Conclusion

Gzip provides one of the best compression algorithms and is widely used in the industry. But with its lack of support for random access search and file splitting, it is not able to leverage the power of parallel and distributed processing systems like Hadoop. With the introduction of the Seekable and Splittable features, Hadoop access can be achieved with high performance.

Oink : Making Pig Self-Service

The Platform and Infrastructure team at eBay Inc. is happy to announce the open-sourcing of Oink – a self-service solution to Apache Pig.

Pig and Hadoop overview

Apache Pig is a platform for analyzing large data sets. It uses a high-level language for expressing data analysis programs, coupled with the infrastructure for evaluating these programs. Pig abstracts the Map/Reduce paradigm, making it very easy for users to write complex tasks using Pig’s language, called Pig Latin. Because execution of tasks can be optimized automatically, Pig Latin allows users to focus on semantics rather than efficiency. Another key benefit of Pig Latin is extensibility:  users can do special-purpose processing by creating their own functions.

Apache Hadoop and Pig provide an excellent platform for extracting and analyzing data from very large application logs. At eBay, we on the Platform and Infrastructure team are responsible for storing TBs of logs that are generated every day from thousands of eBay application servers. Hadoop and Pig offer us an array of tools to search and view logs and to generate reports on application behavior. As the logs are available in Hadoop, engineers (users of applications) also have the ability to use Hadoop and Pig to do custom processing, such as Pig scripting to extract useful information.

The problem

Today, Pig is primarily used through the command line to spawn jobs. This model wasn’t well suited to the Platform team at eBay, as the cluster that housed the application logs was shared with other teams. This situation created a number of issues:

  • Governance – In a shared-cluster scenario, governance is critically important to attain. Pig scripts and requests of one customer should not impact those of other customers and stakeholders of the cluster. In addition, providing CLI access would make governance difficult in terms of controlling the number of job submissions.
  • Scalability – CLI access to all Pig customers created another challenge:  scalability. Onboarding customers takes time and is a cumbersome process.
  • Change management – No easy means existed to upgrade or modify common libraries.

Hence, we needed a solution that acted as a gateway to Pig job submission, provided QoS, and abstracted the user from cluster configuration.

The solution:  Oink

Oink solves the above challenges not only by allowing execution of Pig requests through a REST interface, but also by enabling users to register jars, view the status of Pig requests, view Pig request output, and even cancel a running Pig request. With the REST interface, the user has a cleaner way to submit Pig requests compared to CLI access. Oink serves as a single point of entry for Pig requests, thereby facilitating rate limiting and QoS enforcement for different customers.

oinkOink runs as a servlet inside a web container and allows users to run multiple requests in parallel within a single JVM instance. This capability was not supported initially, but rather required the help of the patch found in PIG-3866. This patch provides multi-tenant environment support so that different users can share the same instance.

With Oink, eBay’s Platform and Infrastructure team has been able to onboard 100-plus different use cases onto its cluster. Currently, more than 6000 Pig jobs run every day without any manual intervention from the team.

Special thanks to Vijay Samuel, Ruchir Shah, Mahesh Somani, and Raju Kolluru for open-sourcing Oink. If you have any queries related to Oink, please submit an issue through GitHub.