Email Tech Is Now Ad Tech


 

eBay has come a long way in our CRM and email marketing in the past two years. Personalization is a relatively easy task when you’re dealing with just one region and one vertical and a hundred thousand customers. With 167M active buyers across the globe, eBay’s journey to help each of our buyers find their version of perfect was quite complex.

Like many in our industry, we’ve had to deal with legacy systems, scalability, and engineering resource constraints. And yet, we’ve made email marketing a point of pride — instead of the “check mark” that we started from. Here’s our story.

Our starting point was a batch-and-blast approach. Our outbound communications very much reflected our organizational structure: as a customer, I’d get a fashion email on Monday, a tech email on Tuesday, and a motors email on Wednesday. This of course wasn’t the kind of an experience we wanted to create.

Additionally, for each of our marketing campaigns, we hand-authored targeting criteria — just as many of our industry colleagues do today in tools like Marketo and ExactTarget. This approach worked OK, but the resulting segment size was too large — in hundreds of thousands. This meant that we were missing out on the opportunity to treat customers individually. It also didn’t scale well — as our business grew internationally, we needed to add more and more business analysts; and the complexity of our contact strategy was becoming unmanageable.

We wanted to create a structurally better experience for our customers — and our big bet was to go after 1:1 personalization using real-time data. We wanted to use machine learning to do the targeting, with real-time feedback loops powering our models.

Since email is such a powerful driver for eCommerce, we committed to a differentiated experience in this channel. After evaluating multiple off-the-shelf solutions, we settled on building an in-house targeting and personalization system — as the size of the eBay marketplace is astounding, and many opportunities and issues are quite unique. We set a high bar: every time we show an offer to a customer, it has to be driven by our up-to-the minute understanding of what the customer did and how other customers are responding to this offer.

Here are some examples of the scenarios we targeted:

  • eBay has many amazing deals, and our community is very active. Deals quickly run out of inventory. We can’t send an offer to a customer and direct them to an expired deal. Thus, our approach involved open-time rendering of offers in email.
  • Some of our retail events turn out to be much more popular than we anticipate. We want to respond to this real-time engagement feedback by adjusting our recommendations quickly. We thus built a feedback loop that shows an offer to a subset of customers; then, if an event is getting a much higher click-through rate than we expected, we show it to more customers. If it for some reason isn’t doing well — for example, if the creative is underperforming — the real-time “bandit” approach reduces its visibility.

Both of these scenarios required us to have real-time CRM and engagement streams. That is, we needed to know when a customer opens an email or clicks on it, and based on this knowledge, instantaneously adjust our recommendations for other customers. This of course is miles away from a typical multi-step batch system based on ETL pipelines that most retailers have today. We were no different — we had to reengineer our delivery and data collection pipes to be real-time. The payoff, however, is quite powerful — this real-time capability is foundational to our ability to personalize better: both today and in the years to come.

The resulting solution transformed email marketing at eBay: instead of hundreds of small, uncoordinated campaigns each month, we now have a small set of “flagship” campaigns, each of which is comprised of one or more offers. Each of the offers is selected at the open time of the email, and the selection is made based upon machine-learned model which uses real-time data. As a result, we saw significant growth in both engagement and sales driven by our emails.

You’ll notice that this component-level personalization approach is all about treating email content as an ad canvas. The problem is fundamentally similar: once you’ve captured the customers attention — be it via a winning bid on an ad auction, or by having that customer open your email — you need to find the most relevant offer to show. Each email slot can be thought of as a first-party ad slot. This realization allowed us to unify our approaches between display advertising and email: the same stack now powers both.

We extended this approach to scenarios like paid social campaigns, where Facebook would want to retrieve the offer from us a priori to manage their customer experience. We built a real-time push framework, where, whenever we find a deal that is better than what we previously apprised Facebook of, we immediately push that offer to Facebook.

This creates a powerful cross-channel multiplier: if we happen to see the customer on the display channel, the same ad-serving pipeline is engaged — and our flagship deal-finding campaign can be served to that customer, too. This means that evolving our flagship campaigns — adding more sophisticated machine learning, improving our creatives — contributes to all channels that are powered by this pipeline, not just email.

Orchestration across channels too becomes possible: we can choose to send an email to a customer with a relevant offer; if they don’t open it, we can then target them with a display ad, and an onsite banner; then, after showing the offer a set number of times across all channels, we can choose to stop the ad — implementing an effective cross-channel impression cap. And each condition for state transitions in this flow can itself be powered by a machine-learning model.

eBay’s scale creates an admirable engineering challenge for a true CRM. By putting our customers, and their behavioral signals, at the top of our priority list, we were able to create an asset in our CRM platform that positions us well towards In this journey towards 1:1 personalization. A single real-time, event-driven pipeline we’ve built allows for coordinated, up-to-the-minute offers to be served — wherever we happen to see the customer.

Alex Weinstein (@alexweinstein) is the Director of Marketing Technologies and CRM at eBay and the author of the Technology + Entrepreneurship blog, where he explores data-driven decision making in the face of uncertainty. Prior to eBay, Alex was the head of product development at Wetpaint, a personalization tech startup.

Graphic: Rahul Rodriguez

An Approach to Achieve Scalability and Availability of Data Stores

 

Today there has been an explosion of the web, specifically in social networks and users of ecommerce applications, that corresponds to an explosion in the sheer volume of data we must deal with. The web has become so ubiquitous that it is used by everyone, from the scientists in 1990s, who used it for exchanging scientific documents, to five-year-olds today exchanging emoticons about kittens. There comes the need of scalability, which is the potential of a system, network, or process to be enlarged in order to accommodate that data growth. The web has virtually brought the world closer, which means there is no such thing as “down time” anymore. Business hours are 24/7, with buyers shopping in disparate time zones. Thereby, a necessity for high availability of the data stores arises. This blog post provides a course of action required to achieve scalability and availability for data stores.

This article covers the following methods to provide a scalable and highly available data stores for applications.

  • Scalability: a distributed system with self-service scaling capability
    • Data capacity analysis
    • Review of data access patterns
    • Different techniques for sharding
    • Self-service scaling capability
  • Availability: physical deployment, rigorous operational procedures, and application resiliency
    • Multiple data center deployment
    • Self-healing tools
    • Well-defined DR tiering, RTO, RPO, and SOPs
    • Application resiliency for data stores

Scalability

With the advent of the web, especially Web 2.0 sites where millions of users may both read and write data, scalability of simple database operations has become more important. There are two ways to scale a system: vertically and horizontally. This talk focuses on horizontal scalability, where both the data and the load of simple operations is distributed/sharded over many servers, where the servers do not share RAM, CPU, or disk. Although in some implementations disk and storage can be shared, auto scaling can become a challenge for such cases.

diagram abstractly illustrating scalability measures. Image by freeimageslive.co.uk – freebie.photography

The following measures should be considered as mandatory methods in building a scalable data store.

  • Data capacity analysis: It is a very important task to understand the extreme requirements of the application in terms of peak and average transactions per second, peak number of queries, payload size, expected throughput, and backup requirements. This enables the data store scalability design in terms of how many physical servers are needed and hardware configuration of the data store with respect to memory footprint, disk size, CPU Cores, I/O throughput, and other resources.

  • Review data access patterns: The simplest course to scale an application is to start by looking for access patterns. Given the nature of distributed systems, all queries to the data store must have the access key in all real-time queries to avoid scatter and gather problem across different servers. Data must be aligned by the access key in each of the shards of the distributed data store. In many applications, there can be more than one access key. For example, in an ecommerce application, data retrieval can be by Product ID or by User ID. In such cases, the options are to either store the data redundantly aligned by both keys or store the data with a reference key, depending upon the application’s requirements.

  • Different techniques for sharding: There are different ways to shard the data in a distributed data store. Two of the common mechanisms are function-based sharding and lookup-based sharding.Function-based sharding refers to the sharding scheme where a deterministic function is applied on the key to get the value of shard. In this case, the shard key should exist in each entity stored in the distributed data store, for efficient retrieval. In addition, if the shard key is not random, it can cause hot spots in the system.Lookup-based sharding refers to a lookup table used to store the start range and end range of the key. Clients can cache the lookup table to avoid single point of failure.Many NoSQL databases implement one of these techniques for achieving scalability.

  • Self-service scaling capability: Self-service scaling, or auto-scaling, can work as a jewel in the scalable system crown. Data stores are designed and architected to provide enough capacity to scale up front, but rapid elasticity and cloud services can enable vertical and horizontal scaling in the true sense. Self-service vertical scaling enables the addition of resources to an existing node to increase its capacity, while self-service horizontal scaling enables the addition or removal of nodes in the distributed data store via “scale-up” or “scale-down” functionality.

Availability

Data stores need to be highly available for read and write operations. Availability refers to a system or component that is continuously operational for a desirably long length of time. Below are some of the methods to ensure that the right architectural patterns, physical deployment, and rigorous operational procedures are in place for a highly available data store.

diagram of the four availability methods discussed in this blog post

  • Multiple data center deployment: Distributed data stores must be deployed in different data centers with redundant replicas for disaster recovery. Geographical location of data centers should be chosen cautiously to avoid network latency across the nodes. The ideal way is to deploy primary nodes equally amongst the data centers along with local and remote replicas in each data center. Distributed Data stores inherently reduces the downtime footprint by the sharding factor. In addition, equal distribution of nodes across data centers causes only 1/nth of the data to be unavailable in case of a complete data center shutdown.

  • Self-healing tools: Efficient monitoring and self-healing tools must be in place to monitor the heartbeat of the nodes in the distributed data store. In case of failures, these tools should not only monitor but also provide a way to bring the failed component alive or should provide a mechanism to bring its most recent replica up as the next primary. This self-healing mechanism should be cautiously used per the application’s requirements. Some high-write-intensive applications cannot afford inconsistent data, which can change the role of self-healing tools to monitor and alert the application for on-demand healing, instead.

  • Well-defined DR tiering, RTO, RPO, and SOPs: Rigorous operational procedures can bring the availability numbers (ratio of the expected value of the uptime of a system to the aggregate of the expected values of up and down time) to a higher value. Disaster recovery tiers must be well defined for any large-scale enterprise, with an associated expected downtime for the corresponding tiers. The Recovery Time Objective (RTO) and Recovery Point Objective (RPO) should be well tested in a simulated production environment to provide a predicted loss in availability, if any. Well-written SOPs are proven saviors in a crisis, especially in a large enterprise, where Operations can implement SOPs to recover the system as early as possible.

  • Application resiliency for data stores: Hardware fails, but systems must not die. Application resiliency is the ability of an application to react to problems in one of its components and still provide the best possible service. There are multiple ways that an application can use to achieve high availability for read and write database operations. Application resiliency for reads enables the application to read from a replica in the case of primary failure. Resiliency can also be part of a distributed data store feature, as in many of the NoSQL databases. When there is no data affinity of the newly inserted data with the existing data, a round-robin insertion approach can be taken, where new inserts can write to a node other than the primary when the primary is unavailable. On the contrary, when there is data affinity of the newly inserted data with the existing data, the approach is primarily driven by consistency requirements of the application.

The key takeaway is that in order to build a scalable and highly available data store, one must take a systematic approach to implement the methods described in this paper. This list of methods is a mandatory, comprehensive list, but not exhaustive, and it can have more methods added to it as needed. Plan to grow BIG and aim to be 24/7 UP, and with the proper scalability and availability measures in place, the sky is the limit.

References

Image by freeimageslive.co.uk – freebie.photography

Rheos

 

Data IS the next currency.  The increased demand for real-time data across almost every business and technology platform has changed the world we live in.  It is no different at eBay.

About two years ago, I was thrilled when I was asked to lead a development team to build a real-time data platform at eBay using Kafka. Initially, it was just for our Oracle change stream. In late 2015, we decided to expand it to a fully managed, secure, and easy-to-use real-time data platform, known as Rheos. The goal of Rheos is to provide a near real-time buyer experience, seller insights, and a data-driven commerce business at eBay.

While Kafka has given us core capabilities in stream processing, managing a large, distributed, highly available, real-time data pipelines running on the cloud across security zones and data centers is hard without automation and core services. Hence, Rheos was built to provide the necessary life-cycle management, monitoring, and well-architected standards and ecosystem for the real-time streaming data pipelines. Currently, the pipelines consist of Kafka, Storm and stream processing applications. Shared and non-shared data streams can be running on these pipelines.

By the end of 2016, nearly 100 billion messages flowed through the pipelines in Rheos daily. In 2017, Rheos is expected to handle 15 times the current traffic.

So, how did we get there?

Concepts

At a very high level, Rheos has these concepts:

  • Data taxonomy is a well-defined convention that classifies and catalogs events into proper namespaces for organizational, ease of discovery, and management purposes.
  • Category is a top-level component in a namespace for a given stream type, for example, monitoring events, click stream events, business events, and so on.
  • Stream captures the logical data flow that leads to a consumable data point in Kafka. The data flow may cut across one or more data points and stream processing units.
  • Domain represents a shard or a group of related topics for a given stream type. Topics in the group are subject to a set of control parameters such as max partitions, max replica, max data retention period, max topic count, and service level agreement, just as examples.
  • Namespace is used to classify the different data streams in Rheos. A namespace is composed of category, stream, and domain

Automation

Lifecycle Management Service

Lifecycle Management Service is a cloud service that provisions and provides full lifecycle management (LCM) for Zookeeper, Kafka, Storm, and MirrorMaker clusters. It is built on a modular architecture with a pluggable extension and frameworks. This combination allows it to create and perform LCM on a stream pipeline running on any cloud platforms (such as OpenStack, AWS, Google Cloud). The Lifecycle Management Service allows you to provision, flex up/down a cluster, or replace a bad node in a cluster. In addition to its CLI API, it is equipped with a RESTful API that allows Rheos Management Service (see the Core Service below) to perform simple operation on a guest instance. For example, the management service can do a rolling start on a troubled Kafka cluster via the Lifecycle Manager API.

Lifecycle Management Service architectural building blocks consist of these components

  • API Server (REST and CLI) — a thin layer that parses, validates, and forwards requests to Task Manager
  • Task Manager (RPC) — a stateful service that creates and executes orchestration workflows on a cluster of nodes
  • Conductor — a component that is responsible for receiving heartbeat information from the guest instances
  • Guest Agent — A lightweight agent that runs on the guest instance; responsible for executing a command from the Task Manager on the instance as well as sending heartbeat metrics to the Conductor
  • Message Queue — a scoped, controlled, and secured way for the communication between the API Server, Task Manager, Conductor and the Guest Agent

The pluggable extension includes these functions:

  • Workflow
  • Monitoring and metrics emitter and aggregator
  • Authentication and authorization
  • Configuration management
  • IaaS (the underlying compute, storage, resource management, etc.)

Core Service

Rheos core service consists of the following components: Kafka Proxy Server, Schema Registry Service, Stream Metadata Service, and Management Service. The following picture captures how these components interact with each other.

Rheos Kafka Proxy Server

One of Rheos’ key objectives is to provide a single point of access to the data streams for the producers and consumers without hard-coding the actual broker names. This allows any open-source Kafka connectors, framework, and Kafka clients written in any programming language to seamlessly produce or consume in Rheos.

To do this, we created a Rheos Kafka Proxy Server that handles Kafka TCP Protocol so that the Proxy Server can intercept any initial connection requests from the clients. Upon receiving the initial connection requests, the Proxy Server identifies which Kafka cluster the topic resides on via the Rheos Metadata Service (described below). Then, the actual broker cnames will be returned to the clients so that the clients can complete the final connection handshake with the brokers.

In addition, Rheos Kafka Proxy Server also allows operations to easily replace a bad node or move a topic from one Kafka cluster to another with very little to no impact to the clients.

Schema Registry Service

To promote data hygiene in Rheos and ease of use for both stream producer and consumer, each event in Rheos must be identifiable with an Avro schema. Rheos has built a Schema Registry Service based on confluent.io Schema Registry. This service hosts data format definition, provides schema versioning and serialization information for each event type. In addition, Rheos users can view, insert, and update the schemas in the registry.

Rheos Metadata Service

Stream Metadata Service provides a system of record for each stream and the associated producer and consumer(s) that are known to the system. Prior to producing to or consuming from a stream, one must “register” the Kafka topic along with the associated schema, stream producer, and consumer with the Metadata Service. With this, Kafka topics, broker list along with the associated schemas can easily be discovered or browsed via Rheos REST API or Portal. More importantly, no hard coding of broker names in the client code! In addition, the Metadata Service also makes it possible for our Management Service and Health Check System to seamlessly monitor, alert, and perform life cycle management operations on streams and the infrastructure that the streams run on.

The recorded information includes the following items:

  • The physical (cluster) location of a topic or a stream processing job/topology
  • Data durability, retention policy, partition, producer, and consumer information
  • Source and target data mirroring information
  • Default configuration for Zookeeper, Kafka, and Storm
  • Topic schema information
  • And more

Management Service

Rheos performs stream, producer, and consumer life cycle management operations with a set of predefined Standard Operating Procedure (SOP) in the Management Service. Each SOP has a series of steps that can be performed on a guest instance via the Lifecycle Management Service. For example, Operations can initiate a rolling restart of a Kafka cluster using one of the SOPs.

Health Check System

This service monitors the health of each asset (for example, a Kafka, Zookeeper, or MirrorMaker node) that is provisioned through the Lifecycle Management Service in these aspects:

  • Node state (up or down)
  • Cluster health
  • Producer traffic, consumer lags, or data loss

It periodically samples data from Kafka topics, performs consumer lag checks, and end-to-end latency checks via Management Service. Upon anomaly or error detection, the service generates an alert via email and/or to eBay Operations. In addition, the Health Check Service records a consumer’s current offset with a timestamp in the primary and the secondary Kafka clusters.

Producer traffic

Producer traffic is closely monitored and can be viewed on the Rheos Portal. To provide a quick visual for a producer’s traffic trending or pattern, the current traffic volume of a stream domain (aka topic group with selected or all partitions) is overlaid on top of its yesterday’s traffic pattern. This way, one can quickly detect if there’s an anomaly with the current traffic.

End-to-end latency

A popular question everyone wants to ask is the data end-to-end latency or consumer lags in a stream pipeline. Rheos Health Check System provides a stream domain’s end-to-end latency by measuring two periods of time:

  • From when an event is published to Kafka to the time when the event is consumed by a consumer
  • From when an event is published to Kafka to the time when the broker writes to disk

Stream consistency check

To quickly remediate a problem in a stream, the Health Check System proactively monitors a set of in-sync replicas (ISR) for a given topic in a stream. In addition, it also ensures that the stream that the topic goes through is consistent spanning across one or more Kafka clusters.

Node status

Last but not the least, our Health Check System also monitors the state of each node in Rheos. At a high level, it provides a quick overview of the cluster health by checking these conditions:

  • Whether a node is reachable or not
  • Whether the primary workload (broker, Zookeeper, etc.) is running or not on a reachable node or not
  • Whether a randomly selected node in a cluster can properly fulfil a request or not

Rheos Mirroring Service

In addition to Kafka’s cluster replication, Rheos Mirroring Service provides high data availability and integrity by mirroring data from source cluster to one or more target clusters. Built around Kafka’s MirrorMaker, the service is used to set up MirrorMaker instances and mirror a group of topics from one cluster to another via a REST API. Through the API, one can start and stop the mirroring of a topic group.

Rheos Mirroring Service consists of these key components:

  • Asset Agent is co-located on a mirroring compute node and responsible for reporting heartbeat metrics to a State Store.
  • Mirror Manager is a REST service that starts and stops the mirroring of a topic group. It is equipped with the intelligence to properly distribute the MirrorMaker instances across the cluster based on a distribution strategy.
  • Configurator is an Ansible playbook that resides on each MirrorMaker node. It is responsible for these functions:
    • Creating the required Kafka producer/consumer properties for a topic group
    • Creating the required directory structure for the instance along with the supervisor configuration
    • Starting or stopping the MirrorMaker instance based on the given source to target mirroring configuration
  • Mirror Bootstrap is a thin Java wrapper that registers and deregisters the MirrorMaker instance in the State Store prior to interacting with the underlying Mirror Maker instance. This allows us to capture the physical and the logical data mirroring activities.

Using the Mirroring Service to achieve high availability

As shown below, data can be mirrored from one region or availability zone to one or more regions or availability zones for highly availablity reasons. To do that, MirrorMaker instances are set up in the target locations to consume data from a source cluster and subsequently publish to target clusters.

Using the Mirroring Service to move data across security zones

In addition, Data Mirroring is used to provide data movement from one security zone to another. As shown below, MirrorMaker instances are set up in the target security zone to consume data from the source security zone over a TLS connection and subsequently publish the received data to the target clusters.

How to access Kafka securely?

To acquire a broker connection, a Rheos client must be authenticated by the eBay Identity Service via the Kafka SASL mechanism. Upon authentication, the client is then further authorized through Kafka’s default pluggable Authorizer via Zookeeper.

In some cases, such as moving data across security zones, TLS is also enabled at the connection level.

Conclusion

Rheos has opened a new chapter in many aspects at eBay.  With Rheos, eBay data can now be securely extracted and moved from a data store, application, or other source to one or more locations in a real-time manner.  Stream processing has opened up new possibilities for eBay businesses, fraud detection, monitoring, analytics, and more at eBay.