Monthly Archives: September 2012

ql.io APIs for eBay Marketplaces

We are pleased to announce the release of eBay Marketplaces APIs based on ql.io:

https://github.com/ql-io/ql.io-ebay-mp-apis

We have created tables for most of the calls for the eBay Finding, Shopping, and Trading APIs. The fields required in the API calls are mapped using either EJS (Embedded JavaScript) or Mustache templates. You can use the tables as they are; or you can create your own tables and routes for the specific fields and calls that your application will need.

To use the tables that we have provided, clone the above repository. Then run

make clean install

You need to have the API key and authentication credentials for the particular API. You can get the API key at https://www.x.com/developers/ebay. Simply enter the appropriate values in the dev.json file in the config directory.

The test directory includes examples of various API calls. As described at http://ql.io/ we support different SQL-like statements – select, insert, delete, etc. We also support various HTTP verbs, such as GET, PUT, POST, and DELETE, with the SQL-like statements.

Usage scenarios

As examples of how the APIs can be used, here are three different scenarios. You can run each of the statements shown individually, and the call will return a “Success” message. Alternatively, you can use the UI to test if the calls have been successful.

SCENARIO 1:  WatchList add/remove (buyer’s perspective)

Here we use a keyword to search for items. For each item matching the keyword, we obtain the item ID and use it to add the item to the watchlist.

itemid = select searchResult.item[0].itemId from finding.findItemsByKeywords where keywords = 'camera';

insert into trading.AddToWatchList (ItemID) values ("{itemid");

We can also remove an item from the watchlist:

return delete from trading.RemoveFromWatchList where ItemID in ("{itemid");

SCENARIO 2:  AddItem + PlaceOffer (buyer’s + seller’s perspective)

This scenario uses the POST method with the AddItem call of the trading API. Here we pass the entire XML as the input string, which the ql.io engine will directly pass to the API gateway. This method is referred to as “passing opaque parameters” in the documentation at http://www.ql.io/docs/insert.

insert into trading.AddItem values ('<?xml version="1.0" encoding="utf-8"?>
   <AddItemRequest xmlns="urn:ebay:apis:eBLBaseComponents">
      <ErrorLanguage>en_US</ErrorLanguage>
      <WarningLevel>High</WarningLevel>
      <Item><Title>Best book</Title>
         <Description>This is the best book. Super condition!</Description>
         <PrimaryCategory>
            <CategoryID>377</CategoryID>
         </PrimaryCategory>
         <StartPrice>1.00</StartPrice>
         <ConditionID>3000</ConditionID>
         <CategoryMappingAllowed>true</CategoryMappingAllowed>
         <Country>US</Country>
         <Currency>USD</Currency>
         <DispatchTimeMax>3</DispatchTimeMax>
         <ListingDuration>Days_7</ListingDuration>
         <ListingType>Chinese</ListingType>
         <PaymentMethods>PayPal</PaymentMethods>
         <PayPalEmailAddress>magicalbookseller@yahoo.com</PayPalEmailAddress>
         <PictureDetails>
            <PictureURL>http://i.ebayimg.sandbox.ebay.com/00/s/MTAwMFg2NjA=/$(KGrHqJHJEsE-js(zPJ)BP)cWCLLSQ~~60_1.JPG?set_id=8800005007</PictureURL>
         </PictureDetails>
         <PostalCode>95125</PostalCode>
         <Quantity>1</Quantity>
         <ReturnPolicy>
            <ReturnsAcceptedOption>ReturnsAccepted</ReturnsAcceptedOption>
            <RefundOption>MoneyBack</RefundOption>
            <ReturnsWithinOption>Days_30</ReturnsWithinOption>
            <Description>
               If you are not satisfied, return the book for a refund.
            </Description>
            <ShippingCostPaidByOption>Buyer</ShippingCostPaidByOption>
         </ReturnPolicy>
         <ShippingDetails>
            <ShippingType>Flat</ShippingType>
            <ShippingServiceOptions>
               <ShippingServicePriority>1</ShippingServicePriority>
               <ShippingService>USPSMedia</ShippingService>
               <ShippingServiceCost>2.50</ShippingServiceCost>
            </ShippingServiceOptions>
         </ShippingDetails>
         <Site>US</Site>
      </Item>
      <RequesterCredentials>
         <Username>ql.io-test1</Username>
         <Password>ebay</Password>
      </RequesterCredentials>
      <WarningLevel>High</WarningLevel>
   </AddItemRequest>') ; 

return insert into trading.PlaceOffer (ErrorLanguage,EndUserIP,ItemID,Offer.Action, Offer.MaxBid, Offer.Quantity ) values ("en_US", "192.168.255.255", "200002581483", "Bid", "20.00","1"); 

The Item ID is obtained after the item is added. This Item ID can later be used to revise the item listing (such as to add to the item description).

SCENARIO 3:  SellingManagerProduct add/remove (seller’s perspective)

In this final example, we use Selling Manager product calls of the ql.io-based Trading API. Here, we use columns and name-value pairs (the first of the insert methods described in http://www.ql.io/docs/insert) to add a product:

result = insert into trading.AddSellingManagerProduct
         (Version,
          RequesterCredentials.eBayAuthToken,
          SellingManagerProductDetails.ProductName,
          SellingManagerProductDetails.QuantityAvailable,
          SellingManagerProductDetails.FolderID)
         values
         ("737",
          "MyAuthToken",
          "Harry Potter Book4",
          "50",
          "4651612545");

prodID = "{result.SellingManagerProductDetails.ProductID}";

And here, we delete a product:

return delete from trading.DeleteSellingManagerProduct where ProductID = "{prodID}";

Try it yourself!

Go ahead and fork https://github.com/ql-io/ql.io-ebay-mp-apis, and play with the tables and routes. For an overview and FAQs, see the readme. You can also ask questions and join the community discussion in the ql.io group.

 

Grid Computing with Fault-Tolerant Actors and ZooKeeper

The actor model of computation has gained in reputation over the past decade as engineers have learned to grapple with concurrent and distributed systems to achieve scalability and availability.

I was in the zone about a year ago developing a job scheduling framework for an application called Nebula, which we use to manage keywords and SEM campaigns. A simple actor library with durable mailboxes implemented over Apache ZooKeeper seemed like a good fit at the time. Looking back, I can say that the solution has worked well. At the time, we had some good reasons (e.g., special throttling requirements) to develop our own framework instead of customizing an existing one. Faced with the same decision today, I would seriously consider building something around Akka.

We developed two primary patterns to make actors highly reliable. The first is to model critical actor state and mailboxes in ZooKeeper. The second is to use the ZooKeeper watch mechanism for actor supervision and recovery.

Example:  job type limiter

To explain how these patterns work, let’s look at the design of a specific actor, the job type limiter. Each job is of a given type, and type limits are set by an administrator to prevent too many jobs of a certain type from running simultaneously in the grid. The type limiter is a singleton in the compute grid and ensures that job type limits are enforced.

For example, let’s assume we have jobs that invoke third-party services to update the maximum bid for keyword ads. The call capacity is limited, so while the grid has enough slots to run hundreds of jobs, let’s say we can run only 10 concurrent jobs against a partner API. The type limiter ensures that the 11th job waits until one of the 10 that are running has completed.

For each job type, the type limiter keeps track of the running count and the limit. When the running count hits the limit, the gate is closed until a job of the corresponding type terminates. So how do we ensure this critical state is maintained correctly even if you cut the power cord to the system on which the type limiter is running? More importantly, how do we eliminate drift to prevent our running counts from being off by one? If a job host crashes, how do we guarantee that the type limiter receives a message indicating the job has ended?

Actors, mailboxes, queues, and watches

An actor receives and acts upon messages sent to its mailbox. Implementing an actor is quite easy because there are no locks to contend with – you get a message, handle it, and optionally send new messages or start new actors as a side-effect. But the mailbox, the limits, and the counter values must be durable and highly available for the type limiter to accurately and efficiently keep track of running counts in a fault-tolerant manner.

To model actor mailboxes, I started with a simple queue in ZooKeeper. A set of persistent sequential znodes represent messages, and their parent represents the queue. In order to wake up an actor when new messages arrive in the queue, I prepare another znode called a touch-pad. A sending actor creates the message znode, and then invokes setData() on the touch-pad to trigger a watch. The receiving actor establishes a data watch on the touch-pad and, when the watch is triggered, invokes getChildren() on the queue to collect messages.

 After retrieval from ZooKeeper, the messages are ordered by sequence number and entered into a local queue for processing by the target actor. The actor keeps track of messages it has received by writing the message sequence number back to the queue znode. It does so by calling the asynchronous setData() before actually handling the message. The reason for writing ahead is that messages are assumed to be idempotent; if processing led to an error that prevented the actor from writing the current message sequence number back to the queue, the actor may end up in an infinite recovery cycle: get-message, handle-message-fail, read-recover, get-message, handle-message-fail,….

Though writing back the sequence number ahead of processing prevents the infinite cycle, it’s possible that the message isn’t properly handled at all. What if the actor crashes right after the sequence number has been written, but before the message has been processed? There are different ways to recover from a situation like this. In our case the scheduling framework holds enough extra state in ZooKeeper to allow actors to recover and eventually self-heal. In some cases, an actor will rely on senders to repeat a message; in others, it will check its current state against expected values and take corrective measures if necessary. If an actor hasn’t received any messages for some time, it might run an admin cycle where it checks its state against expected values in its environment, or publishes its assumptions (sends a message) so other actors can confirm.

More concretely, the type limiter doesn’t just keep a running count for each job type. It also records the job handle for each job that it lets out of the gate. In fact, it records the handle in ZooKeeper before it lets the job out of the gate. The job handle encodes a reference to a job’s state (e.g., command) and a run ID. Each time a job is scheduled, its run ID is incremented, so the handle for each run is unique. The framework also serializes runs for the same job (in case of reruns or retries).

Other actors, called job monitors – one per host – are responsible for monitoring jobs running locally. They know whether a job process is still alive and progressing. Job monitors can confirm whether a job handle recorded by the type limiter refers to a job that is actually running. They also emit messages when they’re idle to indicate no jobs are running so actors like the type limiter can validate their state. If the whole cluster is idle, the type limiter verifies that there are no job handles in its records. In this manner, the type limiter may from time to time suffer a temporary glitch where its count might be off by one, but it quickly recovers and makes adjustments as it verifies its assumptions with help from other actors.

Maintaining state

There are aspects to programming actors that turn out to be non-trivial, especially if the application relies on consistent state to make decisions. Asynchronous message exchanges among many actors across a grid can easily lead to inconsistent views (or views that are eventually consistent, but consistent too late). While ZooKeeper guarantees ordered processing, and messages are delivered to actors in sequence by the library (you could say we are deviating from the pure actor model here), an actor can easily fall out of synch if it doesn’t carefully maintain extra, durable state for recovery and healing, as the type limiter demonstrates.

An actor may expect an acknowledgment after sending a message. But if it doesn’t receive any, it gives up after a timeout, and assumes the receiver or the network is unavailable. In such circumstances it is tempting to retry immediately, but better to back off for a while before retrying. In a larger cluster with a lot of asynchronous communication between actors, congestion or delays in processing are quickly ameliorated if actors back off immediately and try again after a random period of time. Throughput suffers a little, but overall stability and fault tolerance are improved – not to mention the quality of uninterrupted sleep for certain humans.

What happens when the host on which the type limiter is running shuts down or crashes? A supervisor takes note and starts a new actor on another system. The new type limiter instance reads the current message sequence number, loads all job handles, calculates the running counts by job type (i.e., refreshes its caches), reads and deletes messages with sequence numbers less than or equal to the current sequence number, and then starts processing new messages – writing back the sequence number before it handles each message. Got it; but how does it really work? What if the supervisor dies?

Before diving in further, this is a good place to give a shout-out to the folks who designed, developed and maintain ZooKeeper. I think it’s one of the most useful open-source contributions for distributed computing of the past decade. Most of the fault tolerance in Nebula can be reduced to simple patterns that make use of the ZooKeeper model (e.g., ephemeral, session-bound znodes), its serialized atomic updates, and the associated watch mechanism.

Ephemeral znodes, master election, and actor supervision

Now let’s examine actor supervision, the second of the two patterns, and one that relies on ZooKeeper’s ephemeral znodes and watch mechanism. We start with master election, where any number of participants compete to attain mastership for a certain role. You can think of these participants as little actors that come to life under prescribed circumstances – for example, when a system starts up. We require that each election participant run on a different host in the grid.

Let’s consider the role that gives the master the privilege of starting a type limiter. Each role is uniquely identified by a well-known path in ZooKeeper where ephemeral sequential znodes are created by election participants. The winner is the participant who creates the znode with the lowest sequence number. This participant is elected master and acts as a local watchdog responsible for the life cycle of the main actor, in this case the type limiter. If the system shuts down cleanly, the type limiter terminates normally, and the master deletes its znode in ZooKeeper.

The deletion triggers a watch that’s established by another participant – we call it the slave. The slave is the participant who creates the znode with the second-lowest sequence number during the election. Instead of walking away, the slave establishes a watch on the master’s znode and acts like a supervisor for the master and its main actor. When the watch is triggered, the slave is notified, and it immediately takes over the master role by starting a new instance of the type limiter. In other words, the slave mutates into a master and now acts as a watchdog over the type limiter.

 If a master and its main actor die abruptly because their host crashes, their ZooKeeper session eventually times out, and ZooKeeper automatically deletes the corresponding ephemeral znode. The watch is triggered, and the slave takes over as described above. Except for a minor delay due to the session timeout, and the possibility of incomplete message handling by a crashing actor, the fail-over and recovery are the same. Bring on the chaos monkeys and see if you can tell anything unusual is happening in the grid – peeking at the logs is cheating.

We’re not done yet. There’s another actor called a grid monitor, which comes to life at random intervals on each host in the grid. It checks the health of local actors and then participates in an election to run a grid-level scan. One of the critical health checks is to ensure every actor has at least one active master and one active slave. The grid monitor doesn’t communicate with other hosts; it just scans ZooKeeper to see if the expected master and slave znodes exist for each role. If it detects a missing slave (not uncommon after a host goes down and associated slaves take over master roles), it starts up a new participant, which can take on the slave’s supervisor role (unless the master for the same role is running on the local host). With the grid monitor’s help, we can ensure that most fail-overs are handled immediately. Even in the rare case where a slave and then its master are terminated in short order, grid monitor instances will bring them back to life on other hosts within a few seconds.

Actor supervision in Nebula is thus quite similar in principle to the supervisor hierarchies defined in Erlang OTP. The master is the immediate local supervisor acting as a watchdog, the slave operates at the next level and runs on a different host to support rapid fail-over, and the grid monitor acts like a meta-supervisor across the whole grid. One of the most useful properties of the Nebula grid is that all hosts are created equal. There is no single point of failure. An actor can run on any host, and the grid monitor instances ensure the load is balanced across the grid.

Sequential znode suffix limits

Let’s look at one more detail regarding durable mailboxes in ZooKeeper. If you’ve used ZooKeeper, you know that the names of sequential znodes have a numeric 10-digit suffix. A counter value stored in the parent is incremented each time a new znode is created. If you rely on ZooKeeper sequence numbers to order messages in a queue, it’s easy to see that you may eventually exhaust the 10-digit integer limit (2^31-1) in a busy queue. You can deal with this limitation in different ways.

For example, if the grid runs idle from time to time, an actor can delete and recreate its mailbox after the sequence number reaches a certain threshold. This will reset the sequence number to zero. Another solution would be to use different queues and a signal that indicates to senders which one is active. One might also consider implementing a mailbox as a fixed-size ring buffer with no dependency on sequence numbers; however, before sending a new message, senders would have to determine which znodes are free and which have yet to be processed.

In any case, setting an upper bound on the number of messages in a mailbox is a good idea. Otherwise, a group of senders could easily overwhelm a receiver. One way to do this is to have the message dispatcher invoke getData() on the queue’s znode before sending a message. The returned value could specify the queue’s message limit, and the dispatcher could retrieve the number of messages via getNumChildren() on the znode’s stat object and compare it with the limit. If the number of children had already reached the limit, the dispatcher would back off, and the sender would try again after some delay. Again, we trade off a little throughput for higher resilience with a simple backpressure mechanism.

Optimizing for short messages

We haven’t discussed how to store the message payload, and though it may be obvious that each znode in the queue can have data representing payload, I should highlight an optimization that works for very small messages. If your message can be encoded in a short string, say less than 50 characters, you can store it as a prefix in the znode name. On the receiving end, the messages can then be retrieved with a single call to getChildren() without extra calls to getData() on each child.

Design considerations

By now you’ve probably concluded that ZooKeeper doesn’t really provide an optimal foundation for messaging in general. I readily admit that your assessment is correct. If you’re dealing with a large grid where, say, tens of millions of messages are exchanged per day, or the message payloads are large, look for another solution. Also keep in mind that ZooKeeper doesn’t scale horizontally. It was designed as a highly available coordination service with very specific guarantees that limit its scalability. So if you’re building a massive grid, consider partitioning into autonomous cells and coordinate locally as much as possible. Global coordination, especially across data centers, requires special treatment.

The Nebula grid is relatively small (about 100 virtual hosts), and we usually have no more than 10 concurrent jobs running on any given host, so we are coordinating the progress of at most 1000 jobs at a time. For a grid of this size and kind of work load, our actor-based scheduling framework performs very well. The overhead in scheduling usually amounts to less than 5% of overall processing. We run ZooKeeper out of the box on an ensemble of five basic servers, and we use a database as an archive, where the state of completed jobs can be looked up if necessary. A simple REST service is provided for job submission and monitoring.

Identifying and Eliminating Extra DB Latency

I would like to share my experiences with troubleshooting a recent issue on Oracle databases in our datacenters. While the details in this post involve batch jobs, the lessons learned can apply to any business having Oracle SQL traffic between two or more datacenters.

The problem

We noticed that the running time on a set of batch jobs had suddenly increased substantially. Some increase was understandable, since we had moved a significant percentage of the database hosts accessed by these batch jobs to another eBay data center. However, the latency had jumped beyond what had been expected or could be explained.

The investigation

The first step was to check our logs for the most expensive SQL statements.

For batch jobs, it is very easy to distinguish cross-colo and inter-colo transactions, since all batches run on boxes located in the same datacenter. I immediately found a discrepancy between the SQL times for inter-colo calls compared to cross-colo calls. I expected to see a consistent difference of 20 ms for both minimum and average times; 20 ms is the network latency between the two datacenters involved. In reality, the differences far exceeded 20 ms. The following table shows the statistics collected for approximately 8500 calls in an hour:

Statistic

Inter-Colo

Cross-Colo

Exec Count

8600

8490

Minimum/Maximum/Average

1.00/119.00/5.70

21.00/487.00/36.25

Total Time

49028.00 ms

307758.00 ms

On further investigation, I discovered an additional complication. For some queries, the time difference was consistently 20 ms, and for other queries it was consistently 40 ms.

Next, I looked into database performance. I found that the query execution time was the same on all of our database hosts, and that the issue was reproducible independent of the batch jobs, using only tnsping and the Oracle SQL client.

At this point, I almost didn’t suspect a database issue at all; the statistics showed the same results for all DB hosts in the datacenter.  Instead, I suspected an issue related to either the network or network security.

A possible firewall capacity problem was ruled out because firewall latency contribution is constant, and much lower than the latency introduced by the network. When we looked at the network, we saw that ping consistently showed 20 ms, regardless of the time of day or the traffic activity. Of course, in real life we do not merely exchange pings, but rather use actual SQL queries to communicate with DB hosts, so we looked at a tcpdump analysis of the DB hosts next. 

As the root cause of our problem was still unknown, there was no guarantee that the issue could be reproduced by using tcpdump while running queries manually. After trying twice and not getting enough information, we concluded the only reliable way to reproduce the issue was to run tcpdump while the batch job was running.

The last thing to do was to wait until the next high-volume batch run. Finally, we got enough data, and we almost immediately saw an answer (saving us from having to dive even deeper and look at MTU or window size as the cause of additional round trips).  Here is the analysis of the packets for a single query:

No.  Time     Source  Destination  Protocol  Length  Info
1    0.000000 [IP1]   [IP2]        TNS       549     Request, Data (6), Data
2    0.000714 [IP2]   [IP1]        TNS       852     Response, Data (6), Data
3    0.021257 [IP1]   [IP2]        TNS       71      Request, Data (6), Data
4    0.021577 [IP2]   [IP1]        TNS       153     Response, Data (6), Data

Processing the query required four packets spanning about 21.6 ms. Since round-trip-time (RTT) is about 20ms, prior to the first packet there would be 10 ms—the transit time from when the packet left the first datacenter and arrived in the second datacenter, where it was captured as packet 1. After packet 4, there was another 10 ms—the transit time back to the first datacenter. The total RTT for these four packets was therefore about 41.6ms.

We saw that the SELECT statement was in packet 1 and that the response appeared to be in packet 2. We didn’t know what packets 3 and 4 were, but their small size suggested they were some kind of acknowledgement.

The solution

At this point, we had confirmation that for each query, there were four network packets instead of two. The fact that every execution took two round trips indicated that more soft-parsing was occurring than was necessary. The question was, why were commonly running queries getting parsed multiple times? Once parsed, a query should be added to the app server statement cache. We increased the statement cache size parameter, which dictates how many parsed SQL statements can remain in cache at a given time, from 80 to 500.

When the tuned settings went into effect, we were able to eliminate an extra cross-colo round trip for a majority of SQL requests. We immediately saw a positive impact, observing a 30% reduction in execution time for all instances of a batch job.

This experience demonstrates how even small changes in DB settings can positively affect execution response times. And for me, it was both challenging and interesting to drive problem resolution in an area where I did not have much prior experience.