Data IS the next currency. The increased demand for real-time data across almost every business and technology platform has changed the world we live in. It is no different at eBay.

Data IS the next currency.  The increased demand for real-time data across almost every business and technology platform has changed the world we live in.  It is no different at eBay.

About two years ago, I was thrilled when I was asked to lead a development team to build a real-time data platform at eBay using Kafka. Initially, it was just for our Oracle change stream. In late 2015, we decided to expand it to a fully managed, secure, and easy-to-use real-time data platform, known as Rheos. The goal of Rheos is to provide a near real-time buyer experience, seller insights, and a data-driven commerce business at eBay.

While Kafka has given us core capabilities in stream processing, managing a large, distributed, highly available, real-time data pipelines running on the cloud across security zones and data centers is hard without automation and core services. Hence, Rheos was built to provide the necessary life-cycle management, monitoring, and well-architected standards and ecosystem for the real-time streaming data pipelines. Currently, the pipelines consist of Kafka, Storm and stream processing applications. Shared and non-shared data streams can be running on these pipelines.

By the end of 2016, nearly 100 billion messages flowed through the pipelines in Rheos daily. In 2017, Rheos is expected to handle 15 times the current traffic.

So, how did we get there?

Concepts

At a very high level, Rheos has these concepts:

  • Data taxonomy is a well-defined convention that classifies and catalogs events into proper namespaces for organizational, ease of discovery, and management purposes.
  • Category is a top-level component in a namespace for a given stream type, for example, monitoring events, click stream events, business events, and so on.
  • Stream captures the logical data flow that leads to a consumable data point in Kafka. The data flow may cut across one or more data points and stream processing units.
  • Domain represents a shard or a group of related topics for a given stream type. Topics in the group are subject to a set of control parameters such as max partitions, max replica, max data retention period, max topic count, and service level agreement, just as examples.
  • Namespace is used to classify the different data streams in Rheos. A namespace is composed of category, stream, and domain

Automation

Lifecycle Management Service

Lifecycle Management Service is a cloud service that provisions and provides full lifecycle management (LCM) for Zookeeper, Kafka, Storm, and MirrorMaker clusters. It is built on a modular architecture with a pluggable extension and frameworks. This combination allows it to create and perform LCM on a stream pipeline running on any cloud platforms (such as OpenStack, AWS, Google Cloud). The Lifecycle Management Service allows you to provision, flex up/down a cluster, or replace a bad node in a cluster. In addition to its CLI API, it is equipped with a RESTful API that allows Rheos Management Service (see the Core Service below) to perform simple operation on a guest instance. For example, the management service can do a rolling start on a troubled Kafka cluster via the Lifecycle Manager API.

Lifecycle Management Service architectural building blocks consist of these components

  • API Server (REST and CLI) — a thin layer that parses, validates, and forwards requests to Task Manager
  • Task Manager (RPC) — a stateful service that creates and executes orchestration workflows on a cluster of nodes
  • Conductor — a component that is responsible for receiving heartbeat information from the guest instances
  • Guest Agent — A lightweight agent that runs on the guest instance; responsible for executing a command from the Task Manager on the instance as well as sending heartbeat metrics to the Conductor
  • Message Queue — a scoped, controlled, and secured way for the communication between the API Server, Task Manager, Conductor and the Guest Agent

The pluggable extension includes these functions:

  • Workflow
  • Monitoring and metrics emitter and aggregator
  • Authentication and authorization
  • Configuration management
  • IaaS (the underlying compute, storage, resource management, etc.)

Core Service

Rheos core service consists of the following components: Kafka Proxy Server, Schema Registry Service, Stream Metadata Service, and Management Service. The following picture captures how these components interact with each other.

Rheos Kafka Proxy Server

One of Rheos’ key objectives is to provide a single point of access to the data streams for the producers and consumers without hard-coding the actual broker names. This allows any open-source Kafka connectors, framework, and Kafka clients written in any programming language to seamlessly produce or consume in Rheos.

To do this, we created a Rheos Kafka Proxy Server that handles Kafka TCP Protocol so that the Proxy Server can intercept any initial connection requests from the clients. Upon receiving the initial connection requests, the Proxy Server identifies which Kafka cluster the topic resides on via the Rheos Metadata Service (described below). Then, the actual broker cnames will be returned to the clients so that the clients can complete the final connection handshake with the brokers.

In addition, Rheos Kafka Proxy Server also allows operations to easily replace a bad node or move a topic from one Kafka cluster to another with very little to no impact to the clients.

Schema Registry Service

To promote data hygiene in Rheos and ease of use for both stream producer and consumer, each event in Rheos must be identifiable with an Avro schema. Rheos has built a Schema Registry Service based on confluent.io Schema Registry. This service hosts data format definition, provides schema versioning and serialization information for each event type. In addition, Rheos users can view, insert, and update the schemas in the registry.

Rheos Metadata Service

Stream Metadata Service provides a system of record for each stream and the associated producer and consumer(s) that are known to the system. Prior to producing to or consuming from a stream, one must “register” the Kafka topic along with the associated schema, stream producer, and consumer with the Metadata Service. With this, Kafka topics, broker list along with the associated schemas can easily be discovered or browsed via Rheos REST API or Portal. More importantly, no hard coding of broker names in the client code! In addition, the Metadata Service also makes it possible for our Management Service and Health Check System to seamlessly monitor, alert, and perform life cycle management operations on streams and the infrastructure that the streams run on.

The recorded information includes the following items:

  • The physical (cluster) location of a topic or a stream processing job/topology
  • Data durability, retention policy, partition, producer, and consumer information
  • Source and target data mirroring information
  • Default configuration for Zookeeper, Kafka, and Storm
  • Topic schema information
  • And more

Management Service

Rheos performs stream, producer, and consumer life cycle management operations with a set of predefined Standard Operating Procedure (SOP) in the Management Service. Each SOP has a series of steps that can be performed on a guest instance via the Lifecycle Management Service. For example, Operations can initiate a rolling restart of a Kafka cluster using one of the SOPs.

Health Check System

This service monitors the health of each asset (for example, a Kafka, Zookeeper, or MirrorMaker node) that is provisioned through the Lifecycle Management Service in these aspects:

  • Node state (up or down)
  • Cluster health
  • Producer traffic, consumer lags, or data loss

It periodically samples data from Kafka topics, performs consumer lag checks, and end-to-end latency checks via Management Service. Upon anomaly or error detection, the service generates an alert via email and/or to eBay Operations. In addition, the Health Check Service records a consumer’s current offset with a timestamp in the primary and the secondary Kafka clusters.

Producer traffic

Producer traffic is closely monitored and can be viewed on the Rheos Portal. To provide a quick visual for a producer’s traffic trending or pattern, the current traffic volume of a stream domain (aka topic group with selected or all partitions) is overlaid on top of its yesterday’s traffic pattern. This way, one can quickly detect if there’s an anomaly with the current traffic.

End-to-end latency

A popular question everyone wants to ask is the data end-to-end latency or consumer lags in a stream pipeline. Rheos Health Check System provides a stream domain’s end-to-end latency by measuring two periods of time:

  • From when an event is published to Kafka to the time when the event is consumed by a consumer
  • From when an event is published to Kafka to the time when the broker writes to disk

Stream consistency check

To quickly remediate a problem in a stream, the Health Check System proactively monitors a set of in-sync replicas (ISR) for a given topic in a stream. In addition, it also ensures that the stream that the topic goes through is consistent spanning across one or more Kafka clusters.

Node status

Last but not the least, our Health Check System also monitors the state of each node in Rheos. At a high level, it provides a quick overview of the cluster health by checking these conditions:

  • Whether a node is reachable or not
  • Whether the primary workload (broker, Zookeeper, etc.) is running or not on a reachable node or not
  • Whether a randomly selected node in a cluster can properly fulfil a request or not

Rheos Mirroring Service

In addition to Kafka’s cluster replication, Rheos Mirroring Service provides high data availability and integrity by mirroring data from source cluster to one or more target clusters. Built around Kafka’s MirrorMaker, the service is used to set up MirrorMaker instances and mirror a group of topics from one cluster to another via a REST API. Through the API, one can start and stop the mirroring of a topic group.

Rheos Mirroring Service consists of these key components:

  • Asset Agent is co-located on a mirroring compute node and responsible for reporting heartbeat metrics to a State Store.
  • Mirror Manager is a REST service that starts and stops the mirroring of a topic group. It is equipped with the intelligence to properly distribute the MirrorMaker instances across the cluster based on a distribution strategy.
  • Configurator is an Ansible playbook that resides on each MirrorMaker node. It is responsible for these functions:
    • Creating the required Kafka producer/consumer properties for a topic group
    • Creating the required directory structure for the instance along with the supervisor configuration
    • Starting or stopping the MirrorMaker instance based on the given source to target mirroring configuration
  • Mirror Bootstrap is a thin Java wrapper that registers and deregisters the MirrorMaker instance in the State Store prior to interacting with the underlying Mirror Maker instance. This allows us to capture the physical and the logical data mirroring activities.

Using the Mirroring Service to achieve high availability

As shown below, data can be mirrored from one region or availability zone to one or more regions or availability zones for highly availablity reasons. To do that, MirrorMaker instances are set up in the target locations to consume data from a source cluster and subsequently publish to target clusters.

Using the Mirroring Service to move data across security zones

In addition, Data Mirroring is used to provide data movement from one security zone to another. As shown below, MirrorMaker instances are set up in the target security zone to consume data from the source security zone over a TLS connection and subsequently publish the received data to the target clusters.

How to access Kafka securely?

To acquire a broker connection, a Rheos client must be authenticated by the eBay Identity Service via the Kafka SASL mechanism. Upon authentication, the client is then further authorized through Kafka’s default pluggable Authorizer via Zookeeper.

In some cases, such as moving data across security zones, TLS is also enabled at the connection level.

Conclusion

Rheos has opened a new chapter in many aspects at eBay.  With Rheos, eBay data can now be securely extracted and moved from a data store, application, or other source to one or more locations in a real-time manner.  Stream processing has opened up new possibilities for eBay businesses, fraud detection, monitoring, analytics, and more at eBay.