Engineering Blog

Automating Db2Kafka’s Failover

by David Rusu August 27, 2017 | 8 min read

Db2Kafka is an open source service developed at PagerDuty. It is a message pump to take records written to a MySQL table and publish them to a Kafka topic. While we realize it’s not ideal to be using an SQL table as a queue, it has been a useful tool as we are able to keep our Rails code transactional while the rest of our systems become more asynchronous.

Initially we developed this service as a temporary workaround to be used until we have time to restructure our non asynchronous systems, but, as it’s usage has grown and more features are depending on it, we’ve decided it merits some time spent to make it more robust and resilient to failure. This post will go over how we’ve done this by adding support for automatic failover across datacenters to Db2Kafka.

A Bit of History

Db2Kafka has been used in production at PagerDuty for over a year now, it’s been used to move messages for services not part of the notification pipeline. Features such as alert timelines and user activity summaries were powered by Db2Kafka. Due to some upcoming changes to our infrastructure, Db2Kafka will start taking traffic for services in the notification pipeline, moving it into the realm of our “tier 1” critical infrastructure.

However, there is a complication. For performance reasons, Db2Kafka was designed to run as a singleton – only one instance of the service is expected to be running anywhere in our production system. In this new world where Db2Kafka must have strong uptime guarantees, a problem occurring with this single instance would mean downtime for our notification pipeline, which isn’t acceptable!

To solve this problem, we need to allow for the possibility of downtime of a Db2Kafka instance without loss of availability. We’ll have to introduce a second instance which will take over when we encounter failures. We have a few requirements on how this should work:

  1. Db2Kafka remains a singleton (only one instance of it is operating anywhere at a given moment).
  2. Ideally, Db2Kafka runs in the same datacenter as the database it’s reading from to reduce query latency.
  3. Failover should be very quick, on the order of seconds.

Decisions, decisions…

When the requirement is a speedy failover, this usually implies a hot standby: a secondary instance of the service running but not actively processing records. This standby is ready to take over immediately if the primary is in trouble.

But how do we know when to failover? This type of distributed decision making usually calls for tools like Zookeeper or EtcD. These are systems that solve a hard problem known as distributed consensus; that is, how do we make sure services have a consistent view of the world while staying reasonably available when faced with network failures. At PagerDuty we’ve been using Zookeeper for many years with other systems, we understand it and operationally it’s been rock solid. For us it’s the natural choice.

Zookeeper provides two useful concepts which we will use to build our failover implementation: ephemeral nodes and watches. An ephemeral node is an entity whose life is tied to the life of your Zookeeper session. A client connects to Zookeeper and writes the ephemeral node /failover. Once the client’s session expires (through a session timeout), Zookeeper ensures that /failover disappears.

A watch notifies you when the state of a node changes. For example, you can set a watch on the /failover node and it will trigger when the node comes into existence, or if it already exists, trigger when the node disappears. It’s important to be aware that a watch will trigger at most once. Once triggered, the client will need to reset the watch if they still wish to be notified of future changes.

The Barrier Pattern

With these Zookeeper primitives we have a way to make decisions and make sure all interested parties have a consistent view of the world. We now need to decide on the failover model for Db2Kafka. After reviewing a few options, we’ve settled on using the Barrier pattern.

In the context of distributed computing, the Barrier pattern refers to blocking a set of nodes until some condition is met. We can specialize this idea in the context of failover of a singleton service: we want to block the secondary whenever the primary is up.

Here’s a quick sketch of the workflow:

As a primary instance life is simple. On startup it writes the ephemeral node /failover into Zookeeper. If the write is successful, it then proceeds to do work, if the write failed, it will retry until it is successful.

The secondary instance has a bit more to keep track of. On startup, it sets a watch on /failover. Setting a watch will also tell the secondary the current state of the node. If the node exists at the time the watch is set, the secondary will block (not process records) until the watch triggers. If the watch does trigger, the ephemeral node will have disappeared, meaning the primary is in trouble. the secondary instance can safely start processing records.

If the ephemeral node does not exist, this means that the primary instance is down and the secondary instance should start processing messages. This time, if the watch triggers, the secondary must stop doing work as this means that the primary instance has come back up.

Why do we use this barrier pattern rather than a tried and tested distributed lock? While locks are conceptually simpler and don’t require us to run the primary and secondary instances in different modes, they also don’t allow us to encode a preference for a datacenter. This is one of our requirements.

For more details on implementing the barrier pattern with Zookeeper see here.

CrAP, Network Partitions are a Thing!

When building distributed systems, it is important to check how your system will behave in the presence of network partitions. CAP theorem tells us that a network partition forces us to make a tradeoff between consistency and availability. This system is fairly simple: we have just four cases to consider.

No Partitions

In the case of no network partitions, it’s business as usual, nothing to see here.

Partition between secondary instance and Zookeeper

In this case, the primary instance is able to connect to Zookeeper, create the ephemeral node, and will begin doing work. Since the secondary cannot make a connection to Zookeeper, it should block and not process any messages.

Partition between primary instance and Zookeeper

Here, the primary instance was not able to create the barrier node, so it did not start processing records. When the secondary instance connects to Zookeeper, it will notice that the ephemeral node does not exist and begins processing records.

Neither primary nor secondary can connect to Zookeeper

In this case, we are effectively down. Neither the primary nor the secondary instance is able to connect to Zookeeper, and so neither may begin processing messages.

If we prefered a more highly available model, we could amend our plan to have the primary Db2Kafka instance process messages regardless of its connection to Zookeeper. This would mean that we could have two instances of Db2Kafka processing messages if only the primary is partitioned.

Db2Kafka currently has a message ordering guarantee that may be violated if more than one Db2Kafka instance is processing messages. If we were to accept this amended plan we would need to also weaken this ordering guarantee. However, this fourth case of a full Zookeeper/Db2Kafka partition is much less likely to occur than the other cases with only a partial partition.

Our Zookeeper cluster spans all of our datacenters and will survive one datacenter going down with no impact on availability, so rather than inflict downstream services with duplicate, out of order, messages more often, we’ve opted to accept the chance of downtime for Db2Kafka in the unlikely case we have full Zookeeper isolation.

Failure Mode Testing

At PagerDuty we have a strong culture of chaos engineering, large features and services are not considered done until they’ve undergone at least one Failure Friday.

Db2Kafka was put through a few different failure scenarios to validate our assumptions on its behaviour. Part of the testing involved simulating network partitions between Zookeeper and Db2Kafka. In the graph above we see the number of records being processed by Db2Kafka color coded by datacenter.

Db2Kafka is Open Source!

This service solves an important problem for us at PagerDuty, with this new addition to Db2Kafka, we’re now ready to use it for more critical systems as well. Db2Kafka is open source, so if you think there may be a place for it in your organization (or you are just itching to read some code) see below.

This project was completed during my internship on PagerDuty’s Core team. Thanks to the team for providing detailed code and design reviews, the result is better for it! If you’ve found this work interesting and want to help us out, join us!

Comment on this post