By Prudhviraj Karumanchi, Samuel Fu, Sriram Rangarajan, Vidhya Arvind, Yun Wang, John Lu
Introduction
Netflix operates at an enormous scale, serving a whole bunch of tens of millions of customers with various content material and options. Behind the scenes, making certain information consistency, reliability, and environment friendly operations throughout numerous providers presents a steady problem. On the coronary heart of many important features lies the idea of a Write-Forward Log (WAL) abstraction. At Netflix scale, each problem will get amplified. A number of the key challenges we encountered embody:
- Unintended information loss and information corruption in databases
- System entropy throughout completely different datastores (e.g., writing to Cassandra and Elasticsearch)
- Dealing with updates to a number of partitions (e.g., constructing secondary indices on prime of a NoSQL database)
- Information replication (in-region and throughout areas)
- Dependable retry mechanisms for actual time information pipeline at scale
- Bulk deletes to database inflicting OOM on the Key-Worth nodes
All of the above challenges both resulted in manufacturing incidents or outages, consumed vital engineering assets, or led to bespoke options and technical debt. Throughout one specific incident, a developer issued an ALTER TABLE command that led to information corruption. Luckily, the info was fronted by a cache, so the flexibility to increase cache TTL rapidly along with the app writing the mutations to Kafka allowed us to get better. Absent the resilience options on the applying, there would have been everlasting information loss. As the info platform group, we wanted to supply resilience and ensures to guard not simply this software, however all of the important purposes now we have at Netflix.
Concerning the retry mechanisms for actual time information pipelines, Netflix operates at an enormous scale the place failures (community errors, downstream service outages, and so forth.) are inevitable. We wanted a dependable and scalable technique to retry failed messages, with out sacrificing throughput.
With these issues in thoughts, we determined to construct a system that will remedy all of the aforementioned points and proceed to serve the longer term wants of Netflix within the on-line information platform house. Our Write-Forward Log (WAL) is a distributed system that captures information adjustments, offers robust sturdiness ensures, and reliably delivers these adjustments to downstream customers. This weblog publish dives into how Netflix is constructing a generic WAL answer to deal with widespread information challenges, improve developer effectivity, and energy high-leverage capabilities like secondary indices, allow cross-region replication for non-replicated storage engines, and assist broadly used patterns like delayed queues.
API
Our API is deliberately easy, exposing simply the important parameters. WAL has one principal API endpoint, WriteToLog, abstracting away the interior implementation and making certain that customers can onboard simply.
rpc WriteToLog (WriteToLogRequest) returns (WriteToLogResponse) {...}/**
* WAL request message
* namespace: Identifier for a specific WAL
* lifecycle: How a lot delay to set and unique write time
* payload: Payload of the message
* goal: Particulars of the place to ship the payload
*/
message WriteToLogRequest {
string namespace = 1;
Lifecycle lifecycle = 2;
bytes payload = 3;
Goal goal = 4;
}
/**
* WAL response message
* sturdy: Whether or not the request succeeded, failed, or unknown
* message: Motive for failure
*/
message WriteToLogResponse {
Trilean sturdy = 1;
string message = 2;
}
A namespace defines the place and the way information is saved, offering logical separation whereas abstracting the underlying storage programs. Every namespace will be configured to make use of completely different queues: Kafka, SQS, or mixtures of a number of. Namespace additionally serves as a central configuration of settings, comparable to backoff multiplier or most variety of retry makes an attempt, and extra. This flexibility permits our Information Platform to route completely different use circumstances to essentially the most appropriate storage system primarily based on efficiency, sturdiness, and consistency wants.
WAL can assume completely different personas relying on the namespace configuration.
Persona #1 (Delayed Queues)
Within the instance configuration under, the Product Information Techniques (PDS) namespace makes use of SQS because the underlying message queue, enabling delayed messages. PDS makes use of Kafka extensively, and failures (community errors, downstream service outages, and so forth.) are inevitable. We wanted a dependable and scalable technique to retry failed messages, with out sacrificing throughput. That’s when PDS began leveraging WAL for delayed messages.
"persistenceConfigurations": {
"persistenceConfiguration": [
{
"physicalStorage": {
"type": "SQS",
},
"config": {
"wal-queue": [
"dgwwal-dq-pds"
],
"wal-dlq-queue": [
"dgwwal-dlq-pds"
],
"queue.poll-interval.secs": 10,
"queue.max-messages-per-poll": 100
}
}
]
}Persona #2 (Generic Cross-Area Replication)
Under is the namespace configuration for cross-region replication of EVCache utilizing WAL, which replicates messages from a supply area to a number of locations. It makes use of Kafka below the hood.
"persistence_configurations": {
"persistence_configuration": [
{
"physical_storage": {
"type": "KAFKA"
},
"config": {
"consumer_stack": "consumer",
"context": "This is for cross region replication for evcache_foobar",
"target": {
"euwest1": "dgwwal.foobar.cluster.eu-west-1.netflix.net",
"type": "evc-replication",
"useast1": "dgwwal.foobar.cluster.us-east-1.netflix.net",
"useast2": "dgwwal.foobar.cluster.us-east-2.netflix.net",
"uswest2": "dgwwal.foobar.cluster.us-west-2.netflix.net"
},
"wal-kafka-dlq-topics": [],
"wal-kafka-topics": [
"evcache_foobar"
],
"wal.kafka.bootstrap.servers.prefix": "kafka-foobar"
}
}
]
}Persona #3 (Dealing with multi-partition mutations)
Under is the namespace configuration for supporting mutateItems API in Key-Worth, the place a number of write requests can go to completely different partitions and must be ultimately constant. A key element within the under configuration is the presence of Kafka and durable_storage. These information shops are required to facilitate two section commit semantics, which we are going to focus on intimately under.
"persistence_configurations": {
"persistence_configuration": [
{
"physical_storage": {
"type": "KAFKA"
},
"config": {
"consumer_stack": "consumer",
"contacts": "unknown",
"context": "WAL to support multi-id/namespace mutations for dgwkv.foobar",
"durable_storage": {
"namespace": "foobar_wal_type",
"shard": "walfoobar",
"type": "kv"
},
"target": {},
"wal-kafka-dlq-topics": [
"foobar_kv_multi_id-dlq"
],
"wal-kafka-topics": [
"foobar_kv_multi_id"
],
"wal.kafka.bootstrap.servers.prefix": "kaas_kafka-dgwwal_foobar7102"
}
}
]
}An vital word is that requests to WAL assist at-least as soon as semantics because of the underlying implementation.
Beneath the Hood
The core structure consists of a number of key parts working collectively.
Message Producer and Message Shopper separation: The message producer receives incoming messages from shopper purposes and provides them into the queue, whereas the message shopper processes messages from the queue and sends them to the targets. Due to this separation, different programs can convey their very own pluggable producers or customers, relying on their use circumstances. WAL’s management airplane permits for a pluggable mannequin, which, relying on the use-case, permits us to modify between completely different message queues.
SQS and Kafka with a useless letter queue by default: Each WAL namespace has its personal message queue and will get a useless letter queue (DLQ) by default, as a result of there will be transient errors and laborious errors. Software groups utilizing Key-Worth abstraction merely have to toggle a flag to allow WAL and get all this performance with no need to grasp the underlying complexity.
- Kafka-backed namespaces: deal with customary message processing
- SQS-backed namespaces: assist delayed queue semantics (we added customized logic to transcend the usual defaults enforced by way of delay, measurement limits, and so forth)
- Advanced multi-partition eventualities: use queues and sturdy storage
Goal Flexibility: The messages added to WAL are pushed to the goal datastores. Targets will be Cassandra databases, Memcached caches, Kafka queues, or upstream purposes. Customers can specify the goal through namespace configuration and within the API itself.
Deployment Mannequin
WAL is deployed utilizing the Information Gateway infrastructure. Which means WAL deployments robotically include mTLS, connection administration, authentication, runtime and deployment configurations out of the field.
Every information gateway abstraction (together with WAL) is deployed as a shard. A shard is a bodily idea describing a gaggle of {hardware} cases. Every use case of WAL is normally deployed as a separate shard. For instance, the Advertisements Occasions service will ship requests to WAL shard A, whereas the Gaming Catalog service will ship requests to WAL shard B, permitting for separation of considerations and avoiding noisy neighbour issues.
Every shard of WAL can have a number of namespaces. A namespace is a logical idea describing a configuration. Every request to WAL has to specify its namespace in order that WAL can apply the right configuration to the request. Every namespace has its personal configuration of queues to make sure isolation per use case. If the underlying queue of a WAL namespace turns into the bottleneck of throughput, the operators can select so as to add extra queues on the fly by modifying the namespace configurations. The idea of shards and namespaces is shared throughout all Information Gateway Abstractions, together with Key-Worth, Counter, Timeseries, and so forth. The namespace configurations are saved in a globally replicated Relational SQL database to make sure availability and consistency.
Based mostly on sure CPU and community thresholds, the Producer group and the Shopper group of every shard will (individually) robotically scale up the variety of cases to make sure the service has low latency, excessive throughput and excessive availability. WAL, together with different abstractions, additionally makes use of the Netflix adaptive load shedding libraries and Envoy to robotically shed requests past a sure restrict. WAL will be deployed to a number of areas, so every area will deploy its personal group of cases.
Fixing completely different flavors of issues with no change to the core structure
The WAL addresses a number of information reliability challenges with no adjustments to the core structure:
Information Loss Prevention: In case of database downtime, WAL can proceed to carry the incoming mutations. When the database turns into out there once more, replay mutations again to the database. The tradeoff is eventual consistency slightly than instant consistency, and no information loss.
Generic Information Replication: For programs like EVCache (utilizing Memcached) and RocksDB that don’t assist replication by default, WAL offers systematic replication (each in-region and across-region). The goal will be one other software, one other WAL, or one other queue — it’s fully pluggable by way of configuration.
System Entropy and Multi-Partition Options: Whether or not coping with writes throughout two databases (like Cassandra and Elasticsearch) or mutations throughout a number of partitions in a single database, the answer is similar — write to WAL first, then let the WAL shopper deal with the mutations. No extra asynchronous repairs wanted; WAL handles retries and backoff robotically.
Information Corruption Restoration: In case of DB corruptions, restore to the final identified good backup, then replay mutations from WAL omitting the offending write/mutation.
There are some main variations between utilizing WAL and instantly utilizing Kafka/SQS. WAL is an abstraction on the underlying queues, so the underlying expertise will be swapped out relying on use circumstances with no code adjustments. WAL emphasizes a simple but efficient API that saves customers from sophisticated setups and configurations. We leverage the management airplane to pivot applied sciences behind WAL when wanted with out app or shopper intervention.
WAL utilization at Netflix
Delay Queue
The most typical use case for WAL is as a Delay Queue. If an software is thinking about sending a request at a sure time sooner or later, it may offload its requests to WAL, which ensures that their requests will land after the desired delay.
Netflix’s Stay Origin processes and delivers Netflix reside stream video chunks, storing its video information in a Key-Worth abstraction backed by Cassandra and EVCache. When Stay Origin decides to delete sure video information after an occasion is accomplished, it points delete requests to the Key-Worth abstraction. Nevertheless, the big quantity of delete requests in a brief burst intervene with the extra vital real-time learn/write requests, inflicting efficiency points in Cassandra and timeouts for the incoming reside site visitors. To get round this, Key-Worth points the delete requests to WAL first, with a random delay and jitter set for every delete request. WAL, after the delay, sends the delete requests again to Key-Worth. For the reason that deletes are actually a flatter curve of requests over time, Key-Worth is then capable of ship the requests to the datastore with no points.
Moreover, WAL is utilized by many providers that make the most of Kafka to stream occasions, together with Advertisements, Gaming, Product Information Techniques, and so forth. At any time when Kafka requests fail for any cause, the shopper apps will ship WAL a request to retry the kafka request with a delay. This abstracts away the backoff and retry layer of Kafka for a lot of groups, rising developer effectivity.
Cross-Area Replication
WAL can also be used for international cross-region replication. The structure of WAL is generic and permits any datastore/purposes to onboard for cross-region replication. At present, the biggest use case is EVCache, and we’re working to onboard different storage engines.
EVCache is deployed by clusters of Memcached cases throughout a number of areas, the place every cluster in every area shares the identical information. Every area’s shopper apps will write, learn, or delete information from the EVCache cluster of the identical area. To make sure international consistency, the EVCache shopper of 1 area will replicate write and delete requests to all different areas. To implement this, the EVCache shopper that originated the request will ship the request to a WAL equivalent to the EVCache cluster and area.
For the reason that EVCache shopper acts because the message producer group on this case, WAL solely must deploy the message shopper teams. From there, the a number of message customers are set as much as every goal area. They are going to learn from the Kafka subject, and ship the replicated write or delete requests to a Author group of their goal area. The Author group will then go forward and replicate the request to the EVCache server in the identical area.
The largest advantages of this strategy, in comparison with our legacy structure, is having the ability to migrate from multi-tenant structure to single tenant structure for essentially the most latency delicate purposes. For instance, Stay Origin may have its personal devoted Message Shopper and Author teams, whereas a much less latency delicate service will be multi-tenant. This helps us scale back the blast radius of the problems and in addition prevents noisy neighbor points.
Multi-Desk Mutations
WAL is utilized by Key-Worth service to construct the MutateItems API. WAL allows the API’s multi-table and multi-id mutations by implementing 2-phase commit semantics below the hood. For this dialogue, we will assume that Key-Worth service is backed by Cassandra, and every of its namespaces represents a sure desk in a Cassandra DB.
When a Key-Worth shopper points a MutateItems request to Key-Worth server, the request can include a number of PutItems or DeleteItems requests. Every of these requests can go to completely different ids and namespaces, or Cassandra tables.
message MutateItemsRequest {
repeated MutationRequest mutations = 1;
message MutationRequest {
oneof mutation {
PutItemsRequest put = 1;
DeleteItemsRequest delete = 2;
}
}
}The MutateItems request operates on an ultimately constant mannequin. When the Key-Worth server returns a hit response, it ensures that each operation inside the MutateItemsRequest will ultimately full efficiently. Particular person put or delete operations could also be partitioned into smaller chunks primarily based on request measurement, which means a single operation may spawn a number of chunk requests that should be processed in a particular sequence.
Two approaches exist to make sure Key-Worth shopper requests obtain success. The synchronous strategy includes client-side retries till all mutations full. Nevertheless, this technique introduces vital challenges; datastores won’t natively assist transactions and supply no ensures about all the request succeeding. Moreover, when a couple of reproduction set is concerned in a request, latency happens in sudden methods, and all the request chain should be retried. Additionally, partial failures in synchronous processing can depart the database in an inconsistent state if some mutations succeed whereas others fail, requiring complicated rollback mechanisms or leaving information integrity compromised. The asynchronous strategy was finally adopted to deal with these efficiency and consistency considerations.
Given Key-Worth’s stateless structure, the service can not preserve the mutation success state or assure order internally. As a substitute, it leverages a Write-Forward Log (WAL) to ensure mutation completion. For every MutateItems request, Key-Worth forwards particular person put or delete operations to WAL as they arrive, with every operation tagged with a sequence quantity to protect ordering. After transmitting all mutations, Key-Worth sends a completion marker indicating the complete request has been submitted.
The WAL producer receives these messages and persists the content material, state, and ordering info to a sturdy storage. The message producer then forwards solely the completion marker to the message queue. The message shopper retrieves these markers from the queue and reconstructs the whole mutation set by studying the saved state and content material information, ordering operations in accordance with their designated sequence. Failed mutations set off re-queuing of the completion marker for subsequent retry makes an attempt.
Closing Ideas
Constructing Netflix’s generic Write-Forward Log system has taught us a number of key classes that guided our design selections:
Pluggable Structure is Core: The flexibility to assist completely different targets, whether or not databases, caches, queues, or upstream purposes, by way of configuration slightly than code adjustments has been basic to WAL’s success throughout various use circumstances.
Leverage Current Constructing Blocks: We had management airplane infrastructure, Key-Worth abstractions, and different parts already in place. Constructing on prime of those present abstractions allowed us to concentrate on the distinctive challenges WAL wanted to resolve.
Separation of Considerations Allows Scale: By separating message processing from consumption and permitting impartial scaling of every element, we will deal with site visitors surges and failures extra gracefully.
Techniques Fail — Contemplate Tradeoffs Fastidiously: WAL itself has failure modes, together with site visitors surges, gradual customers, and non-transient errors. We use abstractions and operational methods like information partitioning and backpressure indicators to deal with these, however the tradeoffs should be understood.
Future work
- We’re planning so as to add secondary indices in Key-Worth service leveraging WAL.
- WAL will also be utilized by a service to ensure sending requests to a number of datastores. For instance, a database and a backup, or a database and a queue on the identical time and so forth.
Acknowledgements
Launching WAL was a collaborative effort involving a number of groups at Netflix, and we’re grateful to everybody who contributed to creating this concept a actuality. We want to thank the next groups for his or her roles on this launch.
- Caching group — Extra because of Shih-Hao Yeh, Akashdeep Goel for contributing to cross area replication for KV, EVCache and so forth. and proudly owning this service.
- Product Information System group — Carlos Matias Herrero, Brandon Bremen for contributing to the delay queue design and being early adopters of WAL giving beneficial suggestions.
- KeyValue and Composite abstractions group — Raj Ummadisetty for suggestions on API design and mutateItems design discussions. Rajiv Shringi for suggestions on API design.
- Kafka and Actual Time Information Infrastructure groups — Nick Mahilani for suggestions and inputs on integrating the WAL shopper into Kafka shopper. Sundaram Ananthanarayan for design discussions round the potential for leveraging Flink for a number of the WAL use circumstances.
- Joseph Lynch for offering strategic path and organizational assist for this undertaking.
