17 hours in the past
By Ben Sykes
In a earlier submit, we described how Netflix makes use of Apache Druid to ingest hundreds of thousands of occasions per second and question trillions of rows, offering the real-time insights wanted to make sure a high-quality expertise for our members. Since that submit, our scale has grown significantly.
With our database holding over 10 trillion rows and recurrently ingesting as much as 15 million occasions per second, the worth of our real-time information is plain. However this huge scale launched a brand new problem: queries. The dwell present monitoring, dashboards, automated alerting, canary evaluation, and A/B check monitoring which can be constructed on prime of Druid grew to become so closely relied upon that the repetitive question load began to turn into a scaling concern in itself.
This submit describes an experimental caching layer we constructed to handle this drawback, and the trade-offs we selected to just accept.
The Downside
Our inside dashboards are closely used for real-time monitoring, particularly throughout high-profile dwell exhibits or international launches. A typical dashboard has 10+ charts, every triggering a number of Druid queries; one widespread dashboard with 26 charts and stats generates 64 queries per load. When dozens of engineers view the identical dashboards and metrics for a similar occasion, the question quantity rapidly turns into unmanageable.
Take the favored dashboard above: 64 queries per load, refreshing each 10 seconds, considered by 30 folks. That’s 192 queries per second from one dashboard, principally for practically equivalent information. We nonetheless want Druid capability for automated alerting, canary evaluation, and ad-hoc queries. And since these dashboards request a rolling last-few-hours window, every refresh adjustments barely because the time vary advances.
Druid’s built-in caches are efficient. Each the full-result cache and the per-segment cache. However neither is designed to deal with the continual, overlapping time-window shifts inherent to rolling-window dashboards. The total-result cache misses for 2 causes.
- If the time window shifts even barely, the question is totally different, so it’s a cache miss.
- Druid intentionally refuses to cache outcomes that contain realtime segments (these nonetheless being listed), as a result of it values deterministic, secure cache outcomes and question correctness over the next cache hit charge.
The per-segment cache does assist keep away from redundant scans on historic nodes, however we nonetheless want to gather these cached phase outcomes from every information node and merge them within the brokers with information from the realtime nodes for each question.
Throughout main exhibits, rolling-window dashboards can generate a flood of near-duplicate queries that Druid’s caches principally miss, creating heavy redundant load. At our scale, fixing this by merely including extra {hardware} is prohibitively costly.
We wanted a wiser method.
The Perception
When a dashboard requests the final 3 hours of knowledge, the overwhelming majority of that information, all the things besides the latest jiffy, is already settled. The information from 2 hours in the past gained’t change.
What if we might keep in mind the older parts of the end result and solely ask Druid for the half that’s really new?
That is the core concept behind a brand new caching service that understands the construction of Druid queries and serves previously-seen outcomes from cache whereas fetching solely the freshest portion from Druid.
A Deliberate Commerce-Off
Earlier than diving into the implementation, it’s price being specific in regards to the trade-off we’re making. Caching question outcomes introduces some staleness, particularly, as much as 5 seconds for the latest information. That is acceptable for many of our operational dashboards, which refresh each 10 to 30 seconds. In follow, lots of our queries already set an finish time of now-1m or now-5s to keep away from the “flappy tail” that may happen with currently-arriving information.
Since our end-to-end information pipeline latency is often underneath 5 seconds at P90, a 5-second cache TTL on the freshest information introduces negligible further staleness on prime of what’s already inherent within the system. We determined it was higher to just accept this small quantity of staleness in alternate for considerably decrease question load on Druid. However a 5s cache by itself is just not very helpful.
Exponential TTLs
Not all information factors are equally reliable. In real-time analytics, there’s a well known late-arriving information drawback. Occasions can arrive out of order or be delayed within the ingestion pipeline. A knowledge level from 30 seconds in the past may nonetheless change as late-arriving occasions trickle in. A knowledge level from half-hour in the past is nearly actually closing.
We use this statement to set cache TTLs that improve exponentially with the age of the information. Information lower than 2 minutes outdated will get a minimal TTL of 5 seconds. After that, the TTL doubles for every further minute of age: 10 seconds at 2 minutes outdated, 20 seconds at 3 minutes, 40 seconds at 4 minutes, and so forth, as much as a most TTL of 1 hour.
The impact is that contemporary information cycles by the cache quickly, so any corrections from late-arriving occasions in the latest couple of minutes are picked up rapidly. Older information lingers for much longer, as a result of our confidence in its accuracy grows with time.
For a 3-hour rolling window, the exponential TTL ensures the overwhelming majority of the question is served from the cache, leaving Druid to solely scan the latest, unsettled information.
Bucketing
If we have been to make use of a single-level cache key for the question and interval, just like Druid’s current result-level cache, we wouldn’t be capable to extract solely the related time vary from cached outcomes. A shifted window means a special key, which implies a cache miss.
Get Netflix Know-how Weblog’s tales in your inbox
Be part of Medium totally free to get updates from this author.
As a substitute, we use a map-of-maps. The highest-level secret is the question hash with out the time interval; the inside keys are timestamps bucketed to the question granularity (or 1 minute, whichever is bigger) and encoded as big-endian bytes so lexicographic order matches time. This permits environment friendly vary scans; fetching all cached buckets between occasions A and B for a question hash. A 3-hour question at 1-minute granularity turns into 180 impartial cached buckets, every with its personal TTL; when the window shifts (e.g., 30 seconds later), we reuse most buckets from cache and solely question Druid for the brand new information.
How It Works
At this time, the cache runs as an exterior service built-in transparently by intercepting requests on the Druid Router and redirecting them to the cache. If the cache totally satisfies a request, it returns the end result; in any other case it shrinks the time interval to the uncached portion and calls again into the Router, bypassing the redirect to question Druid usually. Non-cached requests (e.g., metadata queries or queries with out time group-bys) cross straight by to Druid unchanged.
This intercepting proxy design permits us to allow or disable caching with none shopper adjustments and is a key to its adoption. We see this setup as short-term whereas we work out a solution to higher combine this functionality into Druid extra natively.
When a cacheable question arrives, these which can be grouping-by time (timeseries, groupBy), the cache performs the next steps.
Parsing and Hashing. We parse every incoming question to extract the time interval, granularity, and construction, then compute a SHA-256 hash of the question with the time interval and components of the context eliminated. That hash is the cache key: it encodes what is being requested (datasource, filters, aggregations, granularity) however not when, so the identical logical question over totally different overlapping time home windows maps to the identical cache entry. There are some context properties that may alter the response construction or contents, so these are included within the cache-key.
Cache Lookup. Utilizing the cache key, we fetch cached factors inside the requested vary, however provided that they’re contiguous from the beginning. As a result of bucket TTLs can expire inconsistently, gaps can seem; once we hit a spot, we cease and fetch all newer information from Druid. This ensures an entire, unbroken end result set whereas sending at most one Druid question, somewhat than “filling gaps” with a number of small, fragmented queries that may improve Druid load.
Fetching the Lacking Tail. On a partial cache hit (e.g., 2h 50m of a 3h window), we rebuild the question with a narrowed interval for the lacking 10 minutes and ship solely that to Druid. Since Druid then scans simply the current segments for a small time vary, the question is normally quicker and cheaper than the unique.
Combining. The cached information and contemporary information are concatenated, sorted by timestamp, and returned to the shopper. From the shopper’s perspective, the response appears to be like equivalent to what Druid would have returned, identical JSON format, identical fields.
Asynchronous Caching. The contemporary information from Druid is parsed into particular person time-granularity buckets and written again to the cache asynchronously, so we don’t add latency to the response path.
Unfavorable Caching
Some metrics are sparse. Sure time buckets could genuinely don’t have any information. With out particular dealing with, the cache would deal with these empty buckets as gaps and re-query Druid for them each time.
We deal with this by caching empty sentinel values for time buckets the place Druid returned no information. Our gap-detection logic acknowledges these empty entries as legitimate cached information somewhat than lacking information, stopping unnecessary re-queries for naturally sparse metrics.
Nevertheless, we’re cautious to not negative-cache trailing empty buckets. If a question returns information as much as minute 45 and nothing after, we solely cache empty entries for gaps between information factors, not after the final one. This avoids incorrectly caching “no information” for time intervals the place occasions merely haven’t arrived but, which might exacerbate the chart delays of late arriving information.
The Storage Layer
For the backing retailer, we use Netflix’s Key-Worth Information Abstraction Layer (KVDAL), backed by Cassandra. KVDAL supplies a two-level map abstraction, a pure match for our wants. The outer secret is the question hash, and the inside keys are timestamps. Crucially, KVDAL helps impartial TTLs on every inside key-value pair, eliminating the necessity for us to handle cache eviction manually.
This two-level construction offers us environment friendly vary queries over the inside keys, which is precisely what we want for partial cache lookups: “give me all cached buckets between time A and time B for question hash X.”
Outcomes
The most important win is throughout high-volume occasions (e.g., dwell exhibits): when many customers view the identical dashboards, the cache serves most equivalent queries as full hits, so the question charge reaching Druid is actually the identical with 1 viewer or 100. The scaling bottleneck strikes from Druid’s question capability to the a lot cheaper-to-scale cache, and with ~5.5 ms P90 cache responses, dashboards load quicker for everybody.
On a typical day, 82% of actual person queries get no less than a partial cache hit, and 84% of end result information is served from cache. Consequently, the queries that attain Druid scan a lot narrower time ranges, touching fewer segments and processing much less information, liberating Druid to deal with aggregating the latest information as a substitute of repeatedly re-querying historic segments.
An experiment validated this, displaying a couple of 33% drop in queries to Druid and a 66% enchancment in general P90 question occasions. It additionally reduce end result bytes and segments queried, and in some instances, enabling the cache lowered end result bytes by greater than 14x. Caveat: the scale of those good points relies upon closely on how comparable and repetitive the question workload is.
Trying Forward
This caching layer continues to be experimental, however outcomes are promising and we’re exploring subsequent steps. We’ve added partial help for templated SQL so dashboard instruments can profit with out writing native Druid queries.
Long term, we’d like interval-aware caching to be constructed into Druid: an exterior proxy provides infrastructure to handle, further community hops, and workarounds (like SQL templating) to extract intervals. Carried out inside Druid, it might be extra environment friendly, with direct entry to the question planner and phase metadata, and profit the broader group with out customized infrastructure. We’d seemingly ship it as an opt-in, configurable, result-level cache within the Brokers, with metrics to tune TTLs and measure effectiveness. Please depart a remark you probably have a use-case that might profit from this function.
Extra broadly, this technique, splitting time-series outcomes into independently cached, granularity-aligned buckets with age-based exponential TTLs, isn’t Druid-specific and will apply to any time-series database with frequent overlapping-window queries.
Abstract
As extra Netflix groups depend on real-time analytics, question quantity grows too. Dashboards are important at our scale, however their reputation can turn into a scaling bottleneck. By inserting an clever cache between dashboards and Druid, one which understands question construction, breaks outcomes into granularity-aligned buckets, and trades a small quantity of staleness for a lot decrease Druid load, we’ve elevated question capability with out scaling infrastructure proportionally, and hope to ship these advantages to the Druid group quickly as a built-in Druid function.
Generally one of the simplest ways to deal with a flood of queries is to cease answering the identical query twice.
