← Back to Design & Development
High-Level Design

Top-K Service — Trending Now

From "count everything and sort" to a sharded, sketched, sub-100ms ranking engine — how YouTube Trending, Twitter top tweets, and Google top searches actually compute the leaderboard

Read this with the framework in mind

This deep-dive applies the 4-step HLD interview framework. As you read, map each section to Requirements → Entities → APIs → High-Level Design → Deep Dives, and notice which of the 8 common patterns and key technologies are at play.

Framework → 8 Patterns → Tech Cheat Sheet →
Step 1

What is a Top-K Service?

Imagine Raj opens YouTube on a Saturday morning. The homepage greets him with a row labeled "Trending Now" — ten thumbnails, ranked. In the eight seconds it took him to launch the app, somewhere around 800,000 other people clicked play on something. To put ten thumbnails on his screen, a system has to answer: out of the millions of videos that got viewed in the last hour, which ten got viewed the most? The naive answer — "count every view, sort all videos by count, take the top ten" — is mathematically right and operationally impossible. There are billions of videos and millions of views per second. You will not finish counting, let alone sorting, before Raj closes the app and goes to make coffee.

This pattern shows up everywhere on the modern web: Twitter Trending Topics (top hashtags in the last 5 minutes), Google Top Searches (most-searched queries today), Reddit r/all (top posts right now), Spotify Daily Top 50, Amazon Best Sellers, fraud-detection top suspicious IPs. They are all the same problem in disguise — a firehose of events arrives, and someone needs the top K most frequent items over a time window.

The two questions that drive every design decision below: (1) How do we count events from a 100,000-events-per-second firehose without storing every event forever? (2) How do we serve the top-10 leaderboard in under 100ms while the underlying counts are constantly moving?

The trick the industry settled on is a beautiful one: give up on exact answers. Nobody on Earth verifies whether the trending video at rank 7 is really the seventh most-watched or actually the eighth. Once you accept "approximate is fine", a 60-year-old algorithm called Count-Min Sketch lets you count billions of distinct items in a few megabytes of RAM. Pair it with a min-heap, and the leaderboard practically maintains itself.

Step 2

Requirements & Goals

Before drawing a single box, pin down what the system must do. Top-K looks deceptively simple ("just count and sort, right?"); the requirements are where you reveal you understand why it isn't.

✅ Functional Requirements

  • Ingest a stream of events (item_id, timestamp) — video views, tweets, search queries
  • Serve getTopK(window, k) for windows of 1 minute, 1 hour, 24 hours, all-time
  • K is small — typically 10 to 1000 (nobody asks for "top 10 million")
  • Ranking by raw count is the default; weighted rankings (likes × 2 + shares × 5) are a stretch goal

⚙️ Non-Functional Requirements

  • Low query latency — getTopK p99 under 100ms (it's on the homepage)
  • Approximate is OK — small ranking errors are invisible to users
  • Throughput — handle 100K events/sec sustained, 1M/sec at peak
  • Cardinality — billions of distinct items in the long tail
  • Fault tolerant — no event drops, surviveable to single-node failure
Out of scope (explicitly): exact counts (use a separate Ad Click Aggregator if you need them for billing), per-user personalized trending (that's a recommendations system, not a Top-K), and per-item drill-down history ("show me this video's hourly view chart" — that's an analytics warehouse problem). Mixing these in is the most common interview mistake.
Step 3

Capacity Estimation & Constraints

Numbers drive every architectural choice in Top-K — they are the entire reason the algorithm changes from "naive sort" to "sketch + heap". Do them out loud.

Traffic estimates (using a YouTube-scale model)

Assume 100,000 video views per second sustained, peaking at 1M/sec during Super Bowl moments. Assume 1 billion distinct videos in the catalog (long tail of historical content). Assume the Trending page is loaded 10,000 times per second globally.

Event ingest

100K events/s

Steady state, peaks at 1M

Distinct items

~1B items

Total catalog cardinality

getTopK QPS

10K req/s

Trending page loads

K range

10 – 1000

Page sizes

Storage estimate — and why it forces approximation

If we logged every single event with (item_id, timestamp) = 16 bytes: 100K × 16B × 86,400s = ~138 GB / day = ~50 TB / year. We do keep this in cold storage for offline analytics — but using it to answer real-time top-K means scanning terabytes per query, which is hopeless.

If we kept exact counts per item: 1B items × (8B id + 8B counter + window metadata) ≈ ~30 GB. Tractable for single-tier storage — but updating and reading at 100K updates/sec while sorting 1B rows on every query is what kills it.

With a Count-Min Sketch approximation: 5 hash functions × 100K columns × 4B counter = ~2 MB per sketch, regardless of how many distinct items show up. We can keep 1,440 sketches (one per minute of the last 24 hours) in ~3 GB of RAM per node. That's the magic.

MetricValueWhy it matters
Ingest rate100K/sForces stream-processing; one box can't keep up
Cardinality1B itemsForces probabilistic counters; per-item map is too big to update fast
Top-K QPS10K/sForces a precomputed cache; cannot recompute on every read
Sketch size2 MBWhy approximation is cheap — fits in L2/L3 cache
Cold log50 TB/yrGoes to S3 for offline exact analytics, not real-time
Step 4

System APIs

Two endpoints carry all the traffic: one to ingest events (write path, very high QPS), one to read the top-K (read path, much lower QPS but must be fast).

REST API surface
// Ingest — write path, 100K/sec
// Almost always called server-to-server (not directly from browsers)
POST /api/v1/events
{
  "item_id":   "vid_8a3f2b",
  "timestamp": 1715072526123,
  "weight":    1                  // optional, default 1
}
→ 202 Accepted   // fire-and-forget, returns immediately

// Read — getTopK, 10K/sec, sub-100ms
GET /api/v1/topk?window=1h&k=10
→ 200 OK
{
  "window":    "1h",
  "generated": "2026-05-07T14:02:30Z",
  "items": [
    { "item_id": "vid_8a3f2b", "approx_count": 1240000 },
    { "item_id": "vid_91e0a4", "approx_count":  980000 },
    ...
  ]
}
Why 202 Accepted for ingest, not 200 OK? The client (the watch-page service that fired the event) must not be blocked waiting for our pipeline to actually count it. We acknowledge receipt the moment the event lands in Kafka — counting happens asynchronously. If we made the client wait, a Kafka hiccup would slow down every video page on YouTube, which is unacceptable.
Why no getCount(item_id) endpoint? Because Count-Min Sketches store counts cell-collision-style — you can ask "how many views does this specific video have", but the answer over-estimates. For per-item exact counts you'd hit the warehouse. The sketch is optimized for "what are the top items", not "how many views did this item get".
Step 5

Algorithms — The Core Problem

Forget the architecture for a second. The hard part of Top-K is algorithmic: how do you compute the top K from a high-velocity stream over billions of items, in bounded memory, in milliseconds? Three approaches are worth weighing — only one survives.

❌ Naive — count in DB, sort on query

Keep (item_id, count) rows in Postgres. On every event, UPDATE counts SET count=count+1 WHERE item_id=X. On every getTopK, SELECT * ORDER BY count DESC LIMIT K.

Why it fails: 1B-row sort on every read = seconds per query. 100K writes/sec on a single Postgres = lock contention meltdown. And there is no notion of a sliding time window — counts are forever.

⚠️ Heap-based exact

Keep a min-heap of size K. On every event, look up the item's current exact count, push into the heap if it beats the heap's minimum.

Why it fails: O(log K) per event is fast, but to know "current count" you still need an exact per-item counter — back to the 1B-row map problem. And in a sharded world the per-shard heap doesn't compose into the global top-K cleanly when an item is split across shards.

✅ Count-Min Sketch + Min-Heap

Use a probabilistic counter (Count-Min Sketch) of fixed size — a few MB regardless of how many items show up. Keep a min-heap of size K alongside. On every event: increment the sketch, then check if the item's sketched count beats the heap min — if yes, swap.

Why it wins: O(1) update, O(log K) heap maintenance, fixed memory, mergeable across shards. Trade-off: counts over-estimate by a bounded amount with bounded probability. Acceptable for ranking.

The mental model that makes this click: Count-Min Sketch is like an approximate vote tally that always overcounts but never undercounts. Min-Heap is like a leaderboard with constant-size memory — the moment a contender beats the lowest-ranked person on the board, it kicks them off. Combined, you have a leaderboard where the count next to each name is "at most this many votes, possibly fewer" — accurate enough to rank, cheap enough to maintain at firehose speed.
Step 6 · CORE

High-Level Architecture — From Naive to Production

This is the section that wins or loses the interview. We build the architecture in three passes: the simplest thing that could plausibly work, why it falls apart, and the production shape where every box justifies itself. The numbers from §3 drive every box.

Pass 1 — The naive design (and why it breaks)

Sketch the simplest possible system: a single counter table in Postgres, updated by every event, sorted by every query.

flowchart LR C["Event Source"] --> APP["App Server"] APP -->|"UPDATE counts SET count=count+1"| DB[("Postgres")] USER["User Browser"] -->|"SELECT * ORDER BY count DESC LIMIT 10"| APP

Three concrete failures emerge the moment real traffic shows up:

💥 Write contention at 100K/sec

Postgres tops out around 5K–10K row updates per second on commodity hardware. We need 100K. The counts table becomes a lock-contention zoo — viral items have their row updated thousands of times per second, blocking each other. The DB CPU pegs at 100% and writes start timing out.

💥 Sort across 1B rows per query

Every getTopK does ORDER BY count DESC LIMIT 10 over 1B rows. Even with an index on count, maintaining that index under 100K updates/sec is itself a workload — and ordered scans across that many rows take seconds. Your homepage is now a stopwatch.

💥 No sliding window concept

The counter is monotonically increasing forever. If a video went viral two years ago and got 100M views, it dominates "trending today" forever. There's no notion of "decay" or "last hour". Adding one means storing per-event timestamps, which dumps us back into the 50 TB/year log problem.

Pass 2 — The mental model: Two-stage pipeline with probabilistic compression

Three insights flip the design from "impossible" to "elegant":

① Decouple write and read

Writes go into an event stream (Kafka). Reads hit a precomputed top-K cache. Counting happens between them in stream processors. No request ever blocks on the other workload's resources. This is the same write-path/read-path split that the URL shortener uses, only here the "count and rank" computation is what bridges the two.

② Approximate counting in fixed memory

Per-shard Count-Min Sketch tracks counts in a few MB regardless of how many distinct items appear. Per-shard min-heap of size K always knows that shard's current top-K contenders. Memory cost stops growing; compute cost per event stays O(log K).

③ Time-bucketed sketches for sliding windows

Don't try to "decay" counts continuously — instead keep one sketch per minute (so 60 sketches cover the last hour, 1440 cover last 24h). Querying "last hour" merges the most recent 60 sketches. Old sketches age out and free their memory.

The Kafka topic is partitioned by item_id using consistent hashing. Each Flink task owns one partition's full sketch + heap state. At query time, we read each shard's locally-computed top-K, merge them in the API server, and return the global top-K. Approximate at every step, but correct enough for a homepage.

Pass 3 — The production shape

Now the full picture. Every node is numbered — find its matching card below to see what it does and what would break without it.

flowchart TB CL["① Event Sources — Watch Page · App · Search Box"] subgraph EDGE["Edge / Ingest"] GW["② API Gateway"] KAFKA["③ Kafka — partitioned by item_id"] end subgraph AGG["Aggregation Plane"] FLINK["④ Stream Processor — Flink / Kafka Streams"] STATE[("⑤ Per-Shard State — RocksDB · CMS + Heap")] WIN["⑥ Window Aggregator — flushes per-minute"] end subgraph QUERY["Query Plane"] CACHE[("⑦ Top-K Cache — Redis ZSET, sharded")] API["⑧ Aggregator API Server"] end subgraph COLD["Cold / Ops"] S3[("⑨ Cold Event Log — S3 / Warehouse")] REPLAY["⑩ Replay Service — backfill, sketch rebuild"] MON["⑪ Monitoring — sketch error, heap accuracy"] end USER["End User Browser"] CL --> GW GW --> KAFKA KAFKA --> FLINK FLINK --> STATE FLINK --> WIN WIN --> CACHE KAFKA --> S3 USER -->|"GET /topk"| API API --> CACHE REPLAY -.replay.-> KAFKA FLINK -.metrics.-> MON CACHE -.metrics.-> MON style CL fill:#e8743b,stroke:#e8743b,color:#fff style GW fill:#171d27,stroke:#9b72cf,color:#d4dae5 style KAFKA fill:#171d27,stroke:#d4a838,color:#d4dae5 style FLINK fill:#171d27,stroke:#4a90d9,color:#d4dae5 style STATE fill:#171d27,stroke:#4a90d9,color:#d4dae5 style WIN fill:#171d27,stroke:#4a90d9,color:#d4dae5 style CACHE fill:#171d27,stroke:#3cbfbf,color:#d4dae5 style API fill:#171d27,stroke:#38b265,color:#d4dae5 style S3 fill:#171d27,stroke:#9b72cf,color:#d4dae5 style REPLAY fill:#171d27,stroke:#d4a838,color:#d4dae5 style MON fill:#171d27,stroke:#e05252,color:#d4dae5 style USER fill:#171d27,stroke:#38b265,color:#d4dae5

Component-by-component — what each numbered box does

Use the numbers in the diagram above to find the matching card. Each one answers what is this, why is it here, and what would break without it.

Event Sources

The internal services that produce the firehose: the YouTube watch-page service ("a play just started"), the Twitter post service ("a tweet was retweeted"), the Search service ("a query was issued"). Each one fires a small JSON event whenever the relevant action occurs. These are internal producers — events are not posted directly from browsers because that lets users tamper with counts.

Solves: nothing on its own — but the entire pipeline downstream exists to absorb their throughput. The fact that they emit fire-and-forget (they don't wait for our acknowledgment to render the user's action) is what lets the rest of the system be asynchronous.

API Gateway

The thin HTTP layer in front of Kafka. Validates the event payload (does it have item_id and timestamp? is the timestamp within sane bounds?), authenticates the producer service, applies per-source rate limits, then publishes to Kafka. Returns 202 in microseconds.

Solves: protecting Kafka from malformed traffic and keeping the producer-facing API stable while the internal stream evolves. Without it, every producer would have to embed Kafka client libs and know broker topology — operationally a nightmare.

Kafka — partitioned by item_id

The durable event log that buffers everything. Partitioned by hash(item_id) % N, so all events for the same item land on the same partition — that's what makes per-shard counting possible. Configured with 3× replication so a broker death loses zero events. Retention is short for the hot tier (24 hours is enough — the cold log handles long-term storage).

Solves: everything. Decouples producers from consumers (producers don't care if Flink is healthy). Provides replay (restart Flink from any offset). Provides natural sharding (each partition is one consumer's whole world). Without Kafka, ingest spikes would directly hammer the stream processors and a Flink restart would lose data.

Stream Processor (Flink / Kafka Streams)

The brain. A cluster of Flink jobs, one task per Kafka partition. Each task consumes events for its partition and updates two pieces of state: (a) a Count-Min Sketch for the current minute's counts, and (b) a min-heap of the top K items observed in the current window. Every event is one O(1) sketch increment plus one O(log K) heap maintenance — fast enough to keep up with the partition's share of 100K/sec.

Solves: the actual counting. Without Flink (or an equivalent stream processor), we'd be storing every event raw and recomputing top-K from scratch on every query — back to the impossible naive design. Flink's checkpointing also provides exactly-once semantics so we don't double-count on restart.

Per-Shard State Store (RocksDB)

Embedded RocksDB inside each Flink task holds the per-minute sketches and heaps. Why RocksDB and not pure heap memory? Because over a 24-hour sliding window we keep 1,440 sketches per shard — that's a few GB per shard, more than fits comfortably in JVM heap. RocksDB stores it on local SSD with an in-memory cache for the active sketches.

Solves: holding sketch state larger than RAM, and persisting it across crashes. Combined with Flink's periodic checkpoints to S3, this means a Flink task can die and resume in seconds without losing its count history.

Window Aggregator

A timer-driven function inside Flink that fires once per minute (and once per hour, once per day, depending on the windows we serve). When it fires, it snapshots the current sketch and heap, computes that shard's top-K for that window, and writes it to the Top-K Cache. Old sketches that age out of all configured windows are deleted to free memory.

Solves: the gap between "Flink has live state" and "the API server can read a fast precomputed answer". Without this aggregator, every getTopK would have to ask Flink to compute on demand — adding hundreds of milliseconds and coupling read latency to write throughput.

Top-K Cache (Redis ZSET, sharded)

A Redis cluster holding precomputed per-shard top-K results, one entry per (window, shard) tuple. Stored as Redis sorted sets (ZSETs) so an API server can read the top K of one shard in a single ZREVRANGE call. TTL matches the window length so stale data evicts itself. Sharded across multiple Redis nodes; replicated for fault tolerance.

Solves: sub-100ms read latency. Every getTopK becomes a few cache reads + a merge — never touches Flink, never touches the cold log. Without this, the read path would be coupled to Flink's processing latency, which spikes whenever event throughput spikes (exactly when users are most likely to load the trending page).

Aggregator API Server

The stateless service that answers GET /api/v1/topk. On each request it reads each shard's precomputed top-K from Redis (e.g. 8 ZREVRANGE calls in parallel), merges them with a single global heap pass, and returns the top K. Total work: 8 cache reads + a 8K-element merge sort = under 30ms typical.

Solves: stitching per-shard approximate top-Ks into a global answer. Why merge in the API tier and not in Redis? Because the merge needs cross-shard heap logic that's awkward to express in Redis Lua, and the merge cost (8K items max) is cheap enough to do per request.

Cold Event Log (S3 / Data Warehouse)

A second consumer of the Kafka topic — a Kafka Connect S3 sink — writes the raw event stream to Parquet files in S3, partitioned by hour. Downstream, a Spark / BigQuery job runs nightly to compute exact top-K, generate dashboards, and feed the recommendations team. The hot path never reads this; only offline analytics does.

Solves: exact counts when we need them (auditing the approximate pipeline, computing weekly leaderboards where exact ranking matters), and the ability to replay events through Flink if we change the sketch parameters or fix a counting bug.

Replay Service

An on-demand tool that re-streams events from S3 (or from an arbitrary Kafka offset) back through a fresh Flink job, used to rebuild sketches after a parameter change ("we want 7 hash functions instead of 5") or to backfill a new time window ("add a 7-day window"). Runs on a separate Flink cluster so it doesn't disturb production.

Solves: the "we changed our mind about sketch parameters" problem, which would otherwise require taking the live system down. Without replay, every algorithmic change would be a one-way migration.

Monitoring

Tracks the metrics that matter for an approximate system: sketch over-estimation rate (compared periodically against exact counts from the warehouse), per-shard heap accuracy, Kafka consumer lag, Flink checkpoint duration, Redis hit rate. Alerts fire when over-estimation drifts above the configured ε bound.

Solves: the special debugging burden of probabilistic systems — when the trending list looks "off", you need to know whether the sketch widened its error or whether something else broke. Without dedicated monitoring, you can't tell.

Concrete walkthrough — Raj watches a viral video, then opens Trending

Two flows, mapped to the numbered components above.

✍️ Ingest flow — 14:02:06, Raj presses play

  1. The watch-page service ① fires POST /events { item_id: vid_8a3f2b, timestamp: ... } to the API Gateway ②. Returns 202 in 5ms — Raj's video starts playing instantly, no blocking.
  2. Gateway publishes the event to Kafka ③. hash("vid_8a3f2b") % 8 = 3, so it lands on partition 3.
  3. Flink task ④ for partition 3 consumes the event. It updates its current-minute Count-Min Sketch ⑤ — incrementing 5 cells (one per hash function). It then queries the sketch for vid_8a3f2b's count, sees it's now 1,240,001, and updates its min-heap ⑤ — vid_8a3f2b was already in the top-K, so just bumps its score.
  4. In parallel, the Kafka S3 sink writes the raw event to s3://events/2026-05-07/14/part-...parquet for tomorrow's exact analytics ⑨.
  5. At 14:03:00, the Window Aggregator ⑥ fires. Snapshots partition 3's top-K and writes it to Redis ⑦ as ZSET topk:1m:shard3 with TTL 5 minutes.

Total latency from event-fired to "appears in top-K cache": under 60 seconds (the window flush interval), often much less.

📖 Read flow — 14:02:30, a user opens the Trending page

  1. Browser GETs /api/v1/topk?window=1h&k=10, hits the Aggregator API Server ⑧.
  2. API server fires 8 parallel ZREVRANGE topk:1h:shard{0..7} 0 9 WITHSCORES calls to Redis ⑦. Each returns that shard's top-10 items with scores. Total Redis time: about 5ms.
  3. API server merges the 80 candidates into a single max-heap and pops the top 10 — about 1ms of CPU.
  4. Returns JSON with the top 10 video IDs and their approximate counts. End-to-end latency: about 30ms.

The read path never touches Flink, Kafka, or the cold log. It is one cache lookup and a tiny merge — exactly as fast as a single Redis call would be.

So what: the architecture is built around three insights — (1) writes and reads are decoupled by Kafka, so a 10× ingest spike doesn't slow down getTopK; (2) counts live in fixed-size probabilistic sketches, so 1B distinct items don't blow up memory; (3) per-shard top-K is precomputed and cached, so the read path is always one-cache-lookup fast. Every box in the diagram exists to remove one of the failure modes from Pass 1.
Step 7

Count-Min Sketch — Deep Dive

Count-Min Sketch (CMS) is the algorithm that makes everything else possible. It's a 2D array of integer counters with d rows (one per hash function) and w columns. Every item maps to exactly one cell per row — and the magic is in how it handles collisions.

How it works in 30 seconds

To increment item X: hash X with each of the d hash functions. Each hash gives you a column index. Increment the cell at (row, hash_row(X)) for every row. So a single increment touches exactly d cells — typically 4 or 5.

To query item X's count: hash X again, look up all d cells, return the minimum. Why min? Because every cell holds at least X's true count (X incremented it) plus possibly some other items' counts (collisions). The cell with the fewest collisions gives the tightest upper bound.

flowchart TB EVENT["Event: item X arrives"] EVENT --> H1["hash_1 X = column 7"] EVENT --> H2["hash_2 X = column 3"] EVENT --> H3["hash_3 X = column 12"] EVENT --> H4["hash_4 X = column 5"] EVENT --> H5["hash_5 X = column 9"] H1 --> R1["Row 1 col 7: ++"] H2 --> R2["Row 2 col 3: ++"] H3 --> R3["Row 3 col 12: ++"] H4 --> R4["Row 4 col 5: ++"] H5 --> R5["Row 5 col 9: ++"] R1 --> Q["Query count of X = MIN of all 5 cells"] R2 --> Q R3 --> Q R4 --> Q R5 --> Q style EVENT fill:#e8743b,stroke:#e8743b,color:#fff style Q fill:#171d27,stroke:#38b265,color:#d4dae5

Why this is a beautiful trade-off

📐 Fixed memory regardless of cardinality

The sketch is d × w × 4 bytes. Pick d=5, w=100,000 → 2 MB. That same 2 MB tracks counts for 100 items, 10 million items, or 10 billion items — the size doesn't grow with input cardinality, only with the desired accuracy bounds.

📊 Bounded error

The over-estimation error is bounded by ε = e/w (Euler's e divided by columns) with probability 1 - δ where δ = 1/eᵈ. With d=5, w=100K, total events N: error ≤ 2.7e-5 × N, with probability 99.3%. For 100M events, error ≤ 2,700 — meaningless when the top item has 1.2M views.

↔️ Mergeable across shards

Two sketches with the same dimensions and hash functions can be added cell by cell to form the union sketch. This is what lets us shard the input — each Flink task maintains a sketch, and at query time we conceptually merge them (in practice we merge the per-shard top-Ks, which is cheaper).

⚠️ Always over-counts, never under-counts

Because cells only ever increment and the query takes MIN, the worst the sketch can say is "X has more votes than X really does". It will never report a smaller count than the truth. For a leaderboard this is exactly the right asymmetry — a borderline item might be falsely included, never falsely excluded.

The interview move: CMS is often confused with Bloom filters. Both use multiple hash functions and a fixed array. The difference: Bloom answers "is X in the set?" (yes/no), CMS answers "how many times has X been seen?" (a number). Bloom uses bits; CMS uses counters. Both can give false positives; only CMS gives an actual count.
Step 8

Min-Heap for Top-K

The CMS knows everyone's approximate count. We don't want to scan all 1B items to find the top 10 — we want the top 10 to maintain themselves as events stream by. That's a min-heap of size K's job.

The algorithm in five lines

on event(item):
  cms.increment(item)
  newCount = cms.query(item)
  if heap.size < K:                heap.push(item, newCount)
  elif newCount > heap.min().count:
    heap.pop(); heap.push(item, newCount)

The heap is keyed by count, with the smallest count at the root (that's what "min-heap of size K" means). Insertion is O(log K). For K = 1000 that's ~10 operations per event — negligible compared to the network and serialization cost of getting the event there.

Subtleties that bite you in production

🔁 Duplicate items in the heap

If item X is already in the heap and arrives again, we shouldn't add it twice — we should update its existing entry's count. Implement the heap as a hash-map-of-handles → heap-positions so you can find and update in O(log K). Lazy alternative: tolerate duplicates and dedupe at read time, simpler but uses more memory.

📉 Counts only go up

The CMS within a single window is monotonic — counts only increase. So an item that fell out of the heap can re-enter only by getting more events. When a window rotates, the heap for that window is reset to empty.

🧮 Heap holds K, not all candidates

Critical: the heap is bounded at K. We do not keep an entry for every item the sketch has seen. The sketch is the "memory of all items"; the heap is the "current top-K shortlist". Memory stays bounded.

Step 9

Sliding Windows — Counting "the Last Hour"

Trending isn't all-time popularity — it's recent popularity. So the system must answer queries like "top-K in the last 1 hour" and have those counts naturally decay as time advances.

Approach — bucketed sketches

Don't try to continuously decay counters (that's expensive and arithmetically messy). Instead, keep one sketch per minute-bucket. To answer "last hour", merge the most recent 60 buckets. To answer "last 24 hours", merge the most recent 1,440. To roll the window forward, drop the oldest bucket.

flowchart LR subgraph Buckets["Per-Minute Sketches — last 60 minutes"] B0["minute 0
sketch + heap"] B1["minute 1
sketch + heap"] B2["minute 2
sketch + heap"] BN["...
minute 59"] end Q1["Query last 5m"] --> M1["MERGE buckets 55-59"] Q2["Query last 1h"] --> M2["MERGE all 60 buckets"] Q3["Query last 1m"] --> M3["READ bucket 59 only"] style B0 fill:#171d27,stroke:#9b72cf,color:#d4dae5 style B1 fill:#171d27,stroke:#9b72cf,color:#d4dae5 style B2 fill:#171d27,stroke:#9b72cf,color:#d4dae5 style BN fill:#171d27,stroke:#9b72cf,color:#d4dae5

Memory cost — and why it's tiny

One sketch is 2 MB. 1,440 sketches per shard × 8 shards = 11,520 sketches × 2 MB ≈ ~23 GB across the whole cluster. RocksDB-backed, with only the active hour or so kept in RAM. Old buckets evict to SSD; expired buckets get deleted entirely.

Pre-aggregating common windows

Merging 1,440 sketches per query is wasteful when "last 24h" is the most common window. So we pre-aggregate: every minute, the Window Aggregator ⑥ merges the latest minute into the running 1h, 24h, and all-time sketches and writes their top-K straight to Redis. Queries for those windows hit a single ZSET — no merge required.

Why bucketed beats continuous decay: continuous-decay schemes (like exponentially-weighted moving averages) work but are arithmetically expensive on every event and hard to make exactly correct under failures. Buckets are dumber and more robust — counting in a bucket is just CMS, rolling a window is just dropping a bucket. Boring and reliable beats clever and broken.
Step 10

Sharding — Why item_id, Not Time

The Kafka topic is partitioned. The most important question: partition by what? Two candidates, only one survives.

❌ Partition by time bucket

"All events in minute N go to partition N % 8." Sounds clean for window logic. But: at 100K events/sec all going to one partition for that minute, you've melted that partition and idled the other 7. And the next minute, the hot partition rotates — every consumer is hammered in turn. Total throughput is bottlenecked by one partition.

✅ Partition by item_id (consistent hash)

"Hash the item_id, modulo N." Events for the same item always go to the same partition — so per-shard counts are complete for that item, and the per-shard heap is meaningful. Load is naturally uniform because there are billions of distinct items, way more than partitions. Hot items (viral videos) are still concentrated on one partition, but at most one partition out of N.

The hot-key problem (and how to defuse it)

What if one item is so viral that its single partition can't keep up? Real example: a Super Bowl halftime moment can drive 50K events/sec for a single video. If our partitions are sized for 12K/sec each, that one item drowns its partition.

Two defenses: (1) sub-key salting for known hot items — append a random 0–9 suffix to spread across 10 partitions, then sum at query time; (2) local pre-aggregation in the producer — buffer events for 100ms locally and ship batched (item_id, count) instead of individual events, reducing the partition load 100×. Most production systems use both.

The "missed top item" caveat

Sharding by item_id has one subtle correctness issue: an item that ranks #11 on every shard could in aggregate be #1 globally — but our per-shard heaps only keep top K, so it's invisible to the merge. In practice this is rare because heavy hitters concentrate on one shard (all their events go there). The fix when it matters: keep top 5K per shard, return top 10 globally — over-collect at the shard level so the long tail of "almost-top" items is preserved through the merge.

Rule of thumb: if you need top-K, keep top-(K × 50) per shard. The cost is negligible (50KB more per heap), the correctness improvement is huge for items distributed evenly across shards.
Step 11

Approximate vs. Exact — When You Can't Cheat

The whole design above only works because approximate is fine. The Trending page tolerates "video #7 is actually #8" — nobody verifies, nobody complains. But not every "top-K" use case is so forgiving.

✅ Approximate is fine — Trending UIs

  • YouTube Trending — viewers don't verify exact ranks
  • Twitter Trending Topics — fast-moving, noisy by nature
  • Reddit r/all — order shifts every refresh anyway
  • Most leaderboards consumed by humans

Use this design — Kafka + Flink + CMS + min-heap + Redis cache.

❌ Exact required — Money or compliance

  • Ad click counts (used for billing)
  • Vote tallies in elections
  • Inventory counts in commerce
  • Financial transactions

CMS would over-count by ~0.001%, which is fine for ranking but means "we billed Coca-Cola for 100 extra clicks they didn't get". Here you build an Ad Click Aggregator — Flink with exact per-key counters, idempotent writes via deduplication keys, and a transactional sink. More expensive, but correct to the cent.

The interview move: when asked about Top-K, your first clarifying question should be "what's the use case — ranking or accounting?" If they say ranking, you propose this sketch-based design. If they say accounting, you pivot to an exact aggregator with deduplication. Asking shows you understand the design space; just charging into one solution shows you don't.
Step 12

Cache Strategy & Fault Tolerance

Cache strategy — Redis ZSET as the read serving layer

Each (window, shard) tuple is one Redis sorted set: topk:1h:shard3 = { vid_8a3f2b: 1240000, vid_91e0a4: 980000, ... }. The Window Aggregator refreshes it every minute (or every window-tick, configurably). TTL is set to 2 × window_size so a temporarily-stalled aggregator doesn't expose users to empty data.

The read path is a pure cache hit — sub-10ms even at 10K QPS. Cache miss is impossible in steady state because the aggregator pre-populates everything; if a key is missing it's a real outage of the aggregator, not a normal cache eviction. We monitor "topk cache miss rate" as a critical alert, not a performance metric.

Fault tolerance — what happens when Flink dies?

Flink checkpoints its full state (sketches + heaps + Kafka offsets) to S3 every 10 seconds. On task failure, Kubernetes restarts the task, which restores from the latest checkpoint and rewinds Kafka to the checkpointed offset. Result: at most 10 seconds of events are reprocessed; zero events are lost.

Sketch state per shard is small (a few GB), so checkpoints are fast — typically under 2 seconds to write to S3. Restore time is similar. End-to-end recovery from a node death: under 30 seconds, with at-most-once consequences only on the freshest 10s window (which gets recomputed).

What about Kafka dying?

Kafka itself is replicated 3× across availability zones, so a single broker death is invisible. A whole-cluster outage is a different beast — but in that case the API Gateway buffers events in a small local queue and refuses ingest after a few seconds, while the read path keeps serving stale-but-correct top-K from Redis. Trending degrades gracefully: it stops updating but doesn't go blank.

Backfill via Replay

If we discover a counting bug or want to add a new window (say, a 7-day one), the Replay Service ⑩ streams events from S3 back through a parallel Flink cluster, builds the new sketches from scratch, and atomically swaps them into Redis when complete. The live system is never disturbed.

The takeaway: probabilistic systems are not "best effort" systems. With Kafka durability, Flink checkpointing, and S3-backed cold storage, the Top-K pipeline gives the same uptime guarantees as any production-grade ETL — the approximation is in the answer's last few digits, never in whether the answer arrives.
Step 13

Interview Q&A

Why approximate? Why not just count exactly?
Memory and speed. Exact counts mean a counter per distinct item — at 1B items × 16 bytes that's 16GB per shard, but more critically every increment is a hash-map lookup with potentially poor cache behavior. CMS is 2MB, fits in L2 cache, and increments are simple array writes. For ranking (where small errors don't matter), the trade-off is a no-brainer. For billing, you'd build a different system.
How does Count-Min Sketch differ from a Bloom filter?
Different question, similar shape. Bloom filter answers "have I seen this item before? (yes/no)" with bit cells and OR semantics. CMS answers "how many times have I seen this item? (a number)" with integer cells and increment semantics. Bloom can give false positives (says yes when truth is no); CMS can give over-estimates (says 105 when truth is 100). Both use multiple hash functions and bounded memory — that's where the family resemblance ends.
How do you handle a sliding window of 24h with minute granularity?
Bucketed sketches. Keep one sketch per minute, so 1,440 sketches per shard cover the last 24 hours. To answer "last hour" merge the most recent 60. To roll the window forward, drop the oldest bucket. Hot buckets live in RAM; cold buckets sit on RocksDB SSD; aged-out buckets are deleted. For very common windows (1h, 24h) we pre-aggregate in the Window Aggregator so reads are a single ZSET lookup.
Why partition Kafka by item_id and not by time?
Two reasons. (1) Time-partitioning means all events for the current minute go to one partition — a 100K/sec firehose into one consumer, while the others idle. Throughput is bottlenecked. (2) item_id partitioning keeps all events for a given item on the same shard, which is what makes per-shard counts complete and per-shard heaps meaningful. Hot items can still concentrate on one partition (the viral video), defused by sub-key salting or producer-side pre-aggregation.
What if an item is just-below threshold on every shard — could it actually be the global #1?
Yes, theoretically — and the fix is over-collection. Each shard's heap of size K throws away items at rank K+1. If item X is rank K+1 on all 8 shards, it's invisible to the merge but globally has 8× the votes of any single shard's #1. In practice, heavy hitters concentrate on one shard (all their events hash to the same partition), so this is rare. The defensive fix: keep top (K × 50) per shard, return top K globally. Costs ~50KB per heap; eliminates the long-tail miss.
How would you change the design for exact counts (e.g., ad clicks for billing)?
Replace CMS with exact per-key counters in Flink state, and add deduplication. Each event carries a unique event_id; Flink keeps a Bloom filter (or short-TTL key-value store) of recently-seen IDs to drop duplicates from at-least-once Kafka delivery. Counters are stored exactly in RocksDB (one per ad_id), updated transactionally, and aggregated to a transactional sink (e.g. Postgres or Iceberg). Costs more memory and CPU than CMS, but gives you the cents-accurate counts that billing requires. This system is usually called an "Ad Click Aggregator".
How fresh is the data — what's the lag from event-to-Trending?
Tunable, typically under 1 minute. The dominant component is the Window Aggregator's flush cadence — if it fires every 60s, the freshest displayed top-K is at most 60s stale. You can tighten this to 10s or even 1s at the cost of more frequent Redis writes and slightly fuzzier counts (less data per bucket = wider relative error). Real systems usually pick 30–60s as a sweet spot.
How do you size the Count-Min Sketch (pick d and w)?
Pick the error bounds first, derive sizes second. CMS guarantees: error ≤ ε × N with probability 1 - δ, where w = ⌈e/ε⌉ and d = ⌈ln(1/δ)⌉. For our case: 100M events per minute, want error under 0.01% of total = 10K, want 99.9% confidence. So ε = 1e-4, δ = 1e-3 → w ≈ 27,200, d ≈ 7. Round up to w=32K, d=8 → sketch size = 1MB. Cheap.
The one-line summary the interviewer remembers: "It's a Kafka-fed Flink pipeline that maintains per-shard Count-Min Sketches plus min-heaps in bucketed time windows, with the per-window per-shard top-K precomputed into Redis sorted sets so the read path is one cache lookup — approximate counts, exact uptime, sub-100ms reads at any scale."