From "count everything and sort" to a sharded, sketched, sub-100ms ranking engine — how YouTube Trending, Twitter top tweets, and Google top searches actually compute the leaderboard
This deep-dive applies the 4-step HLD interview framework. As you read, map each section to Requirements → Entities → APIs → High-Level Design → Deep Dives, and notice which of the 8 common patterns and key technologies are at play.
Imagine Raj opens YouTube on a Saturday morning. The homepage greets him with a row labeled "Trending Now" — ten thumbnails, ranked. In the eight seconds it took him to launch the app, somewhere around 800,000 other people clicked play on something. To put ten thumbnails on his screen, a system has to answer: out of the millions of videos that got viewed in the last hour, which ten got viewed the most? The naive answer — "count every view, sort all videos by count, take the top ten" — is mathematically right and operationally impossible. There are billions of videos and millions of views per second. You will not finish counting, let alone sorting, before Raj closes the app and goes to make coffee.
This pattern shows up everywhere on the modern web: Twitter Trending Topics (top hashtags in the last 5 minutes), Google Top Searches (most-searched queries today), Reddit r/all (top posts right now), Spotify Daily Top 50, Amazon Best Sellers, fraud-detection top suspicious IPs. They are all the same problem in disguise — a firehose of events arrives, and someone needs the top K most frequent items over a time window.
The trick the industry settled on is a beautiful one: give up on exact answers. Nobody on Earth verifies whether the trending video at rank 7 is really the seventh most-watched or actually the eighth. Once you accept "approximate is fine", a 60-year-old algorithm called Count-Min Sketch lets you count billions of distinct items in a few megabytes of RAM. Pair it with a min-heap, and the leaderboard practically maintains itself.
Before drawing a single box, pin down what the system must do. Top-K looks deceptively simple ("just count and sort, right?"); the requirements are where you reveal you understand why it isn't.
(item_id, timestamp) — video views, tweets, search queriesgetTopK(window, k) for windows of 1 minute, 1 hour, 24 hours, all-timeNumbers drive every architectural choice in Top-K — they are the entire reason the algorithm changes from "naive sort" to "sketch + heap". Do them out loud.
Assume 100,000 video views per second sustained, peaking at 1M/sec during Super Bowl moments. Assume 1 billion distinct videos in the catalog (long tail of historical content). Assume the Trending page is loaded 10,000 times per second globally.
100K events/s
Steady state, peaks at 1M
~1B items
Total catalog cardinality
10K req/s
Trending page loads
10 – 1000
Page sizes
If we logged every single event with (item_id, timestamp) = 16 bytes: 100K × 16B × 86,400s = ~138 GB / day = ~50 TB / year. We do keep this in cold storage for offline analytics — but using it to answer real-time top-K means scanning terabytes per query, which is hopeless.
If we kept exact counts per item: 1B items × (8B id + 8B counter + window metadata) ≈ ~30 GB. Tractable for single-tier storage — but updating and reading at 100K updates/sec while sorting 1B rows on every query is what kills it.
With a Count-Min Sketch approximation: 5 hash functions × 100K columns × 4B counter = ~2 MB per sketch, regardless of how many distinct items show up. We can keep 1,440 sketches (one per minute of the last 24 hours) in ~3 GB of RAM per node. That's the magic.
| Metric | Value | Why it matters |
|---|---|---|
| Ingest rate | 100K/s | Forces stream-processing; one box can't keep up |
| Cardinality | 1B items | Forces probabilistic counters; per-item map is too big to update fast |
| Top-K QPS | 10K/s | Forces a precomputed cache; cannot recompute on every read |
| Sketch size | 2 MB | Why approximation is cheap — fits in L2/L3 cache |
| Cold log | 50 TB/yr | Goes to S3 for offline exact analytics, not real-time |
Two endpoints carry all the traffic: one to ingest events (write path, very high QPS), one to read the top-K (read path, much lower QPS but must be fast).
REST API surface// Ingest — write path, 100K/sec // Almost always called server-to-server (not directly from browsers) POST /api/v1/events { "item_id": "vid_8a3f2b", "timestamp": 1715072526123, "weight": 1 // optional, default 1 } → 202 Accepted // fire-and-forget, returns immediately // Read — getTopK, 10K/sec, sub-100ms GET /api/v1/topk?window=1h&k=10 → 200 OK { "window": "1h", "generated": "2026-05-07T14:02:30Z", "items": [ { "item_id": "vid_8a3f2b", "approx_count": 1240000 }, { "item_id": "vid_91e0a4", "approx_count": 980000 }, ... ] }
202 Accepted for ingest, not 200 OK? The client (the watch-page service that fired the event) must not be blocked waiting for our pipeline to actually count it. We acknowledge receipt the moment the event lands in Kafka — counting happens asynchronously. If we made the client wait, a Kafka hiccup would slow down every video page on YouTube, which is unacceptable.getCount(item_id) endpoint? Because Count-Min Sketches store counts cell-collision-style — you can ask "how many views does this specific video have", but the answer over-estimates. For per-item exact counts you'd hit the warehouse. The sketch is optimized for "what are the top items", not "how many views did this item get".Forget the architecture for a second. The hard part of Top-K is algorithmic: how do you compute the top K from a high-velocity stream over billions of items, in bounded memory, in milliseconds? Three approaches are worth weighing — only one survives.
Keep (item_id, count) rows in Postgres. On every event, UPDATE counts SET count=count+1 WHERE item_id=X. On every getTopK, SELECT * ORDER BY count DESC LIMIT K.
Why it fails: 1B-row sort on every read = seconds per query. 100K writes/sec on a single Postgres = lock contention meltdown. And there is no notion of a sliding time window — counts are forever.
Keep a min-heap of size K. On every event, look up the item's current exact count, push into the heap if it beats the heap's minimum.
Why it fails: O(log K) per event is fast, but to know "current count" you still need an exact per-item counter — back to the 1B-row map problem. And in a sharded world the per-shard heap doesn't compose into the global top-K cleanly when an item is split across shards.
Use a probabilistic counter (Count-Min Sketch) of fixed size — a few MB regardless of how many items show up. Keep a min-heap of size K alongside. On every event: increment the sketch, then check if the item's sketched count beats the heap min — if yes, swap.
Why it wins: O(1) update, O(log K) heap maintenance, fixed memory, mergeable across shards. Trade-off: counts over-estimate by a bounded amount with bounded probability. Acceptable for ranking.
This is the section that wins or loses the interview. We build the architecture in three passes: the simplest thing that could plausibly work, why it falls apart, and the production shape where every box justifies itself. The numbers from §3 drive every box.
Sketch the simplest possible system: a single counter table in Postgres, updated by every event, sorted by every query.
Three concrete failures emerge the moment real traffic shows up:
Postgres tops out around 5K–10K row updates per second on commodity hardware. We need 100K. The counts table becomes a lock-contention zoo — viral items have their row updated thousands of times per second, blocking each other. The DB CPU pegs at 100% and writes start timing out.
Every getTopK does ORDER BY count DESC LIMIT 10 over 1B rows. Even with an index on count, maintaining that index under 100K updates/sec is itself a workload — and ordered scans across that many rows take seconds. Your homepage is now a stopwatch.
The counter is monotonically increasing forever. If a video went viral two years ago and got 100M views, it dominates "trending today" forever. There's no notion of "decay" or "last hour". Adding one means storing per-event timestamps, which dumps us back into the 50 TB/year log problem.
Three insights flip the design from "impossible" to "elegant":
Writes go into an event stream (Kafka). Reads hit a precomputed top-K cache. Counting happens between them in stream processors. No request ever blocks on the other workload's resources. This is the same write-path/read-path split that the URL shortener uses, only here the "count and rank" computation is what bridges the two.
Per-shard Count-Min Sketch tracks counts in a few MB regardless of how many distinct items appear. Per-shard min-heap of size K always knows that shard's current top-K contenders. Memory cost stops growing; compute cost per event stays O(log K).
Don't try to "decay" counts continuously — instead keep one sketch per minute (so 60 sketches cover the last hour, 1440 cover last 24h). Querying "last hour" merges the most recent 60 sketches. Old sketches age out and free their memory.
The Kafka topic is partitioned by item_id using consistent hashing. Each Flink task owns one partition's full sketch + heap state. At query time, we read each shard's locally-computed top-K, merge them in the API server, and return the global top-K. Approximate at every step, but correct enough for a homepage.
Now the full picture. Every node is numbered — find its matching card below to see what it does and what would break without it.
Use the numbers in the diagram above to find the matching card. Each one answers what is this, why is it here, and what would break without it.
The internal services that produce the firehose: the YouTube watch-page service ("a play just started"), the Twitter post service ("a tweet was retweeted"), the Search service ("a query was issued"). Each one fires a small JSON event whenever the relevant action occurs. These are internal producers — events are not posted directly from browsers because that lets users tamper with counts.
Solves: nothing on its own — but the entire pipeline downstream exists to absorb their throughput. The fact that they emit fire-and-forget (they don't wait for our acknowledgment to render the user's action) is what lets the rest of the system be asynchronous.
The thin HTTP layer in front of Kafka. Validates the event payload (does it have item_id and timestamp? is the timestamp within sane bounds?), authenticates the producer service, applies per-source rate limits, then publishes to Kafka. Returns 202 in microseconds.
Solves: protecting Kafka from malformed traffic and keeping the producer-facing API stable while the internal stream evolves. Without it, every producer would have to embed Kafka client libs and know broker topology — operationally a nightmare.
The durable event log that buffers everything. Partitioned by hash(item_id) % N, so all events for the same item land on the same partition — that's what makes per-shard counting possible. Configured with 3× replication so a broker death loses zero events. Retention is short for the hot tier (24 hours is enough — the cold log handles long-term storage).
Solves: everything. Decouples producers from consumers (producers don't care if Flink is healthy). Provides replay (restart Flink from any offset). Provides natural sharding (each partition is one consumer's whole world). Without Kafka, ingest spikes would directly hammer the stream processors and a Flink restart would lose data.
The brain. A cluster of Flink jobs, one task per Kafka partition. Each task consumes events for its partition and updates two pieces of state: (a) a Count-Min Sketch for the current minute's counts, and (b) a min-heap of the top K items observed in the current window. Every event is one O(1) sketch increment plus one O(log K) heap maintenance — fast enough to keep up with the partition's share of 100K/sec.
Solves: the actual counting. Without Flink (or an equivalent stream processor), we'd be storing every event raw and recomputing top-K from scratch on every query — back to the impossible naive design. Flink's checkpointing also provides exactly-once semantics so we don't double-count on restart.
Embedded RocksDB inside each Flink task holds the per-minute sketches and heaps. Why RocksDB and not pure heap memory? Because over a 24-hour sliding window we keep 1,440 sketches per shard — that's a few GB per shard, more than fits comfortably in JVM heap. RocksDB stores it on local SSD with an in-memory cache for the active sketches.
Solves: holding sketch state larger than RAM, and persisting it across crashes. Combined with Flink's periodic checkpoints to S3, this means a Flink task can die and resume in seconds without losing its count history.
A timer-driven function inside Flink that fires once per minute (and once per hour, once per day, depending on the windows we serve). When it fires, it snapshots the current sketch and heap, computes that shard's top-K for that window, and writes it to the Top-K Cache. Old sketches that age out of all configured windows are deleted to free memory.
Solves: the gap between "Flink has live state" and "the API server can read a fast precomputed answer". Without this aggregator, every getTopK would have to ask Flink to compute on demand — adding hundreds of milliseconds and coupling read latency to write throughput.
A Redis cluster holding precomputed per-shard top-K results, one entry per (window, shard) tuple. Stored as Redis sorted sets (ZSETs) so an API server can read the top K of one shard in a single ZREVRANGE call. TTL matches the window length so stale data evicts itself. Sharded across multiple Redis nodes; replicated for fault tolerance.
Solves: sub-100ms read latency. Every getTopK becomes a few cache reads + a merge — never touches Flink, never touches the cold log. Without this, the read path would be coupled to Flink's processing latency, which spikes whenever event throughput spikes (exactly when users are most likely to load the trending page).
The stateless service that answers GET /api/v1/topk. On each request it reads each shard's precomputed top-K from Redis (e.g. 8 ZREVRANGE calls in parallel), merges them with a single global heap pass, and returns the top K. Total work: 8 cache reads + a 8K-element merge sort = under 30ms typical.
Solves: stitching per-shard approximate top-Ks into a global answer. Why merge in the API tier and not in Redis? Because the merge needs cross-shard heap logic that's awkward to express in Redis Lua, and the merge cost (8K items max) is cheap enough to do per request.
A second consumer of the Kafka topic — a Kafka Connect S3 sink — writes the raw event stream to Parquet files in S3, partitioned by hour. Downstream, a Spark / BigQuery job runs nightly to compute exact top-K, generate dashboards, and feed the recommendations team. The hot path never reads this; only offline analytics does.
Solves: exact counts when we need them (auditing the approximate pipeline, computing weekly leaderboards where exact ranking matters), and the ability to replay events through Flink if we change the sketch parameters or fix a counting bug.
An on-demand tool that re-streams events from S3 (or from an arbitrary Kafka offset) back through a fresh Flink job, used to rebuild sketches after a parameter change ("we want 7 hash functions instead of 5") or to backfill a new time window ("add a 7-day window"). Runs on a separate Flink cluster so it doesn't disturb production.
Solves: the "we changed our mind about sketch parameters" problem, which would otherwise require taking the live system down. Without replay, every algorithmic change would be a one-way migration.
Tracks the metrics that matter for an approximate system: sketch over-estimation rate (compared periodically against exact counts from the warehouse), per-shard heap accuracy, Kafka consumer lag, Flink checkpoint duration, Redis hit rate. Alerts fire when over-estimation drifts above the configured ε bound.
Solves: the special debugging burden of probabilistic systems — when the trending list looks "off", you need to know whether the sketch widened its error or whether something else broke. Without dedicated monitoring, you can't tell.
Two flows, mapped to the numbered components above.
POST /events { item_id: vid_8a3f2b, timestamp: ... } to the API Gateway ②. Returns 202 in 5ms — Raj's video starts playing instantly, no blocking.hash("vid_8a3f2b") % 8 = 3, so it lands on partition 3.s3://events/2026-05-07/14/part-...parquet for tomorrow's exact analytics ⑨.ZSET topk:1m:shard3 with TTL 5 minutes.Total latency from event-fired to "appears in top-K cache": under 60 seconds (the window flush interval), often much less.
/api/v1/topk?window=1h&k=10, hits the Aggregator API Server ⑧.ZREVRANGE topk:1h:shard{0..7} 0 9 WITHSCORES calls to Redis ⑦. Each returns that shard's top-10 items with scores. Total Redis time: about 5ms.The read path never touches Flink, Kafka, or the cold log. It is one cache lookup and a tiny merge — exactly as fast as a single Redis call would be.
Count-Min Sketch (CMS) is the algorithm that makes everything else possible. It's a 2D array of integer counters with d rows (one per hash function) and w columns. Every item maps to exactly one cell per row — and the magic is in how it handles collisions.
To increment item X: hash X with each of the d hash functions. Each hash gives you a column index. Increment the cell at (row, hash_row(X)) for every row. So a single increment touches exactly d cells — typically 4 or 5.
To query item X's count: hash X again, look up all d cells, return the minimum. Why min? Because every cell holds at least X's true count (X incremented it) plus possibly some other items' counts (collisions). The cell with the fewest collisions gives the tightest upper bound.
The sketch is d × w × 4 bytes. Pick d=5, w=100,000 → 2 MB. That same 2 MB tracks counts for 100 items, 10 million items, or 10 billion items — the size doesn't grow with input cardinality, only with the desired accuracy bounds.
The over-estimation error is bounded by ε = e/w (Euler's e divided by columns) with probability 1 - δ where δ = 1/eᵈ. With d=5, w=100K, total events N: error ≤ 2.7e-5 × N, with probability 99.3%. For 100M events, error ≤ 2,700 — meaningless when the top item has 1.2M views.
Two sketches with the same dimensions and hash functions can be added cell by cell to form the union sketch. This is what lets us shard the input — each Flink task maintains a sketch, and at query time we conceptually merge them (in practice we merge the per-shard top-Ks, which is cheaper).
Because cells only ever increment and the query takes MIN, the worst the sketch can say is "X has more votes than X really does". It will never report a smaller count than the truth. For a leaderboard this is exactly the right asymmetry — a borderline item might be falsely included, never falsely excluded.
The CMS knows everyone's approximate count. We don't want to scan all 1B items to find the top 10 — we want the top 10 to maintain themselves as events stream by. That's a min-heap of size K's job.
on event(item): cms.increment(item) newCount = cms.query(item) if heap.size < K: heap.push(item, newCount) elif newCount > heap.min().count: heap.pop(); heap.push(item, newCount)
The heap is keyed by count, with the smallest count at the root (that's what "min-heap of size K" means). Insertion is O(log K). For K = 1000 that's ~10 operations per event — negligible compared to the network and serialization cost of getting the event there.
If item X is already in the heap and arrives again, we shouldn't add it twice — we should update its existing entry's count. Implement the heap as a hash-map-of-handles → heap-positions so you can find and update in O(log K). Lazy alternative: tolerate duplicates and dedupe at read time, simpler but uses more memory.
The CMS within a single window is monotonic — counts only increase. So an item that fell out of the heap can re-enter only by getting more events. When a window rotates, the heap for that window is reset to empty.
Critical: the heap is bounded at K. We do not keep an entry for every item the sketch has seen. The sketch is the "memory of all items"; the heap is the "current top-K shortlist". Memory stays bounded.
Trending isn't all-time popularity — it's recent popularity. So the system must answer queries like "top-K in the last 1 hour" and have those counts naturally decay as time advances.
Don't try to continuously decay counters (that's expensive and arithmetically messy). Instead, keep one sketch per minute-bucket. To answer "last hour", merge the most recent 60 buckets. To answer "last 24 hours", merge the most recent 1,440. To roll the window forward, drop the oldest bucket.
One sketch is 2 MB. 1,440 sketches per shard × 8 shards = 11,520 sketches × 2 MB ≈ ~23 GB across the whole cluster. RocksDB-backed, with only the active hour or so kept in RAM. Old buckets evict to SSD; expired buckets get deleted entirely.
Merging 1,440 sketches per query is wasteful when "last 24h" is the most common window. So we pre-aggregate: every minute, the Window Aggregator ⑥ merges the latest minute into the running 1h, 24h, and all-time sketches and writes their top-K straight to Redis. Queries for those windows hit a single ZSET — no merge required.
The Kafka topic is partitioned. The most important question: partition by what? Two candidates, only one survives.
"All events in minute N go to partition N % 8." Sounds clean for window logic. But: at 100K events/sec all going to one partition for that minute, you've melted that partition and idled the other 7. And the next minute, the hot partition rotates — every consumer is hammered in turn. Total throughput is bottlenecked by one partition.
"Hash the item_id, modulo N." Events for the same item always go to the same partition — so per-shard counts are complete for that item, and the per-shard heap is meaningful. Load is naturally uniform because there are billions of distinct items, way more than partitions. Hot items (viral videos) are still concentrated on one partition, but at most one partition out of N.
What if one item is so viral that its single partition can't keep up? Real example: a Super Bowl halftime moment can drive 50K events/sec for a single video. If our partitions are sized for 12K/sec each, that one item drowns its partition.
Two defenses: (1) sub-key salting for known hot items — append a random 0–9 suffix to spread across 10 partitions, then sum at query time; (2) local pre-aggregation in the producer — buffer events for 100ms locally and ship batched (item_id, count) instead of individual events, reducing the partition load 100×. Most production systems use both.
Sharding by item_id has one subtle correctness issue: an item that ranks #11 on every shard could in aggregate be #1 globally — but our per-shard heaps only keep top K, so it's invisible to the merge. In practice this is rare because heavy hitters concentrate on one shard (all their events go there). The fix when it matters: keep top 5K per shard, return top 10 globally — over-collect at the shard level so the long tail of "almost-top" items is preserved through the merge.
The whole design above only works because approximate is fine. The Trending page tolerates "video #7 is actually #8" — nobody verifies, nobody complains. But not every "top-K" use case is so forgiving.
Use this design — Kafka + Flink + CMS + min-heap + Redis cache.
CMS would over-count by ~0.001%, which is fine for ranking but means "we billed Coca-Cola for 100 extra clicks they didn't get". Here you build an Ad Click Aggregator — Flink with exact per-key counters, idempotent writes via deduplication keys, and a transactional sink. More expensive, but correct to the cent.
Each (window, shard) tuple is one Redis sorted set: topk:1h:shard3 = { vid_8a3f2b: 1240000, vid_91e0a4: 980000, ... }. The Window Aggregator refreshes it every minute (or every window-tick, configurably). TTL is set to 2 × window_size so a temporarily-stalled aggregator doesn't expose users to empty data.
The read path is a pure cache hit — sub-10ms even at 10K QPS. Cache miss is impossible in steady state because the aggregator pre-populates everything; if a key is missing it's a real outage of the aggregator, not a normal cache eviction. We monitor "topk cache miss rate" as a critical alert, not a performance metric.
Flink checkpoints its full state (sketches + heaps + Kafka offsets) to S3 every 10 seconds. On task failure, Kubernetes restarts the task, which restores from the latest checkpoint and rewinds Kafka to the checkpointed offset. Result: at most 10 seconds of events are reprocessed; zero events are lost.
Sketch state per shard is small (a few GB), so checkpoints are fast — typically under 2 seconds to write to S3. Restore time is similar. End-to-end recovery from a node death: under 30 seconds, with at-most-once consequences only on the freshest 10s window (which gets recomputed).
Kafka itself is replicated 3× across availability zones, so a single broker death is invisible. A whole-cluster outage is a different beast — but in that case the API Gateway buffers events in a small local queue and refuses ingest after a few seconds, while the read path keeps serving stale-but-correct top-K from Redis. Trending degrades gracefully: it stops updating but doesn't go blank.
If we discover a counting bug or want to add a new window (say, a 7-day one), the Replay Service ⑩ streams events from S3 back through a parallel Flink cluster, builds the new sketches from scratch, and atomically swaps them into Redis when complete. The live system is never disturbed.
top (K × 50) per shard, return top K globally. Costs ~50KB per heap; eliminates the long-tail miss.event_id; Flink keeps a Bloom filter (or short-TTL key-value store) of recently-seen IDs to drop duplicates from at-least-once Kafka delivery. Counters are stored exactly in RocksDB (one per ad_id), updated transactionally, and aggregated to a transactional sink (e.g. Postgres or Iceberg). Costs more memory and CPU than CMS, but gives you the cents-accurate counts that billing requires. This system is usually called an "Ad Click Aggregator".ε × N with probability 1 - δ, where w = ⌈e/ε⌉ and d = ⌈ln(1/δ)⌉. For our case: 100M events per minute, want error under 0.01% of total = 10K, want 99.9% confidence. So ε = 1e-4, δ = 1e-3 → w ≈ 27,200, d ≈ 7. Round up to w=32K, d=8 → sketch size = 1MB. Cheap.