Reliable Topic Workflow: Understanding Message Flow During Broker Moves
Document Purpose
This document explains how Danube's reliable topics work under the hood, with a focus on what happens when a topic is moved from one broker to another.
Topic Moves: Why and When
A topic may be moved between brokers for several reasons:
- Manual Load Balancing (Current): An operator uses
danube-admin-cli topics unloadto explicitly move a topic - Automated Load Balancing (Future): The LoadManager detects imbalance and automatically reassigns topics
- Broker Maintenance: A broker needs to be taken offline for upgrades or repairs
- Failure Recovery: A broker crashes and topics need to be reassigned
In all cases, the underlying mechanics are the same: the topic is unloaded from one broker and loaded onto another while preserving message offset continuity and consumer progress.
Core Concepts
Before diving into the workflow, let's establish key concepts:
- WAL (Write-Ahead Log): Local, per-topic storage on each broker for recent messages
- Cloud Storage: Long-term archival storage for historical messages (S3-compatible)
- Offset: A monotonically increasing ID (0, 1, 2, ...) assigned to each message in a topic
- Sealed State: Metadata written to ETCD when a topic is unloaded, capturing the last committed offset
- Subscription Cursor: Tracks which message a consumer last acknowledged
- Tiered Reading: Strategy where consumers read old messages from cloud, new messages from WAL
The critical guarantee: Offsets must be globally unique and continuous for a topic, regardless of which broker hosts it.
Normal Operation (Single Broker)
When a topic is hosted on a single broker with no moves, the workflow is straightforward. This section establishes the baseline behavior that topic moves must preserve.
Message Production Flow
Producer sends message
β
Topic.append_to_storage()
β
WAL.append(msg)
ββ> Assign offset: next_offset.fetch_add(1) // e.g., 0, 1, 2, 3...
ββ> Store in cache: cache.insert(offset, msg)
ββ> Write to disk: Writer queue (async fsync)
ββ> Broadcast: tx.send(offset, msg) // for live consumers
β
Uploader (background task, every 30s)
ββ> Read WAL files
ββ> Stream frames to cloud storage
ββ> Write object descriptor to ETCD
β’ /storage/topics/default/topic/objects/00000000000000000000
β’ {start_offset: 0, end_offset: 21, object_id: "data-0-21.dnb1"}
What's happening here?
Each message produced to a topic goes through a three-stage pipeline:
- Offset Assignment: The WAL atomically increments
next_offsetand assigns it to the message. This offset is the message's permanent ID. - Local Persistence: The message is written to the WAL (both in-memory cache and on-disk log) and broadcast to live consumers via a channel.
- Cloud Upload: A background uploader periodically batches WAL segments and uploads them to cloud storage, writing object descriptors to ETCD for later retrieval.
The key insight: The offset is assigned by the WAL on the hosting broker. When a topic moves, the new broker must continue the offset sequence from where the old broker left off.
Message Consumption Flow
Consumer subscribes
β
SubscriptionEngine.init_stream_from_progress_or_latest()
ββ> Check ETCD for cursor: /topics/.../subscriptions/sub_name/cursor
ββ> If exists: start_offset = cursor_value (e.g., 6)
ββ> If not exists: start_offset = latest
β
WalStorage.create_reader(start_offset=6)
ββ> Get WAL checkpoint: wal_start_offset = 0
ββ> Compare: 6 >= 0 (within WAL retention)
ββ> Return: WAL.tail_reader(from=6, live=false)
ββ> Replay from cache/files: offsets 6, 7, 8...
ββ> Switch to live: broadcast channel for new messages
β
Dispatcher.poll_next()
ββ> Read message from stream
ββ> Send to consumer
ββ> Wait for ACK
β
On ACK received:
ββ> SubscriptionEngine.on_acked(offset)
ββ> Periodically flush cursor to ETCD (every 1000 acks or 5s)
What's happening here?
Consumers track their progress through a topic using a cursor (subscription offset) stored in ETCD:
- Initialization: When a consumer subscribes, the subscription engine checks ETCD for an existing cursor. If found, it resumes from that offset; otherwise, it starts from the latest.
- Tiered Reading: The
WalStoragedetermines where to read messages from: - If the requested offset is within the WAL's retention range, read directly from WAL
- If older, read from cloud storage first, then chain to WAL for newer messages
- Progress Tracking: As messages are acknowledged, the cursor advances and is periodically persisted to ETCD
This cursor-based design is broker-agnostic: it doesn't matter which broker hosts the topic, as long as the offset space is continuous.
State at Rest (Single Broker)
Broker Memory:
WAL:
next_offset: 22 β Ready for next message
cache: [17..21] β Recent messages (capacity=1024)
Uploader:
checkpoint: 21 β Last uploaded offset
Subscription "subs_reliable":
cursor_in_memory: 13 β Last acked by consumer
ETCD:
/topics/default/reliable_topic/delivery: "Reliable"
/topics/default/reliable_topic/subscriptions/subs_reliable/cursor: 13
/storage/topics/default/reliable_topic/objects/00000000000000000000:
{start_offset: 0, end_offset: 21, object_id: "data-0-21.dnb1"}
Cloud Storage:
Topic Move Workflow (With Fix)
This section describes the complete flow when a topic is moved from Broker A to Broker B. The move can be triggered in two ways:
- Manual (Current): An operator runs
danube-admin-cli topics unload /default/reliable_topic - Automated: The LoadManager detects load imbalance and triggers reassignment programmatically
Regardless of trigger, the mechanics are identical. The critical requirement is to preserve offset continuity so that:
- Producers on the new broker continue assigning offsets from where the old broker left off
- Consumers can read the entire message history seamlessly across both brokers
- No message is lost, duplicated, or assigned a conflicting offset
The fix introduced a sealed state mechanism that captures the last committed offset when unloading and uses it to initialize the WAL on the new broker.
Step 1: Unload from Broker A
Trigger: Either danube-admin-cli topics unload /default/reliable_topic or automated LoadManager decision
BrokerService.topic_cluster.post_unload_topic()
β
Create unload marker in ETCD
β’ /cluster/unassigned/default/reliable_topic
β’ {reason: "unload", from_broker: 10285063371164059634}
β
Delete broker assignment
β’ /cluster/brokers/10285063371164059634/default/reliable_topic
β
Broker A watcher sees DELETE event
β
TopicManager.unload_reliable_topic()
ββ> Flush subscription cursors to ETCD
β β’ /topics/.../subscriptions/subs_reliable/cursor: 13
β
ββ> WalFactory.flush_and_seal()
β ββ> WAL.flush() β fsync all pending writes
β β
β ββ> Uploader: cancel and drain
β β ββ> Upload remaining frames to cloud
β β ββ> Update ETCD descriptors
β β β’ objects/00000000000000000000: end_offset=21
β β
β ββ> Write sealed state to ETCD β
β β’ /storage/topics/default/reliable_topic/state
β β’ {
β sealed: true,
β last_committed_offset: 21, β CRITICAL!
β broker_id: 10285063371164059634,
β timestamp: 1768625254
β }
β
ββ> Delete local WAL files
β’ rm ./danube-data/wal/default/reliable_topic/*
Broker A Final State:
- β No local WAL files
- β All messages 0-21 in cloud storage
- β Subscription cursor at 13 in ETCD
- β Sealed state at 21 in ETCD
What just happened?
The unload process ensures data durability and captures critical state:
- Flush Everything: All pending writes are committed, and the uploader drains its queue to ensure all messages reach cloud storage
- Write Sealed State: The broker writes
{sealed: true, last_committed_offset: 21}to ETCD. This is the key piece of state that the new broker will use to initialize its WAL correctly. - Clean Up Local State: The WAL files are deleted since they're now in cloud storage, and the topic is removed from the broker
At this point, Broker A no longer owns the topic, but all messages 0-21 are safely stored in the cloud, and the metadata in ETCD contains everything needed to restore the topic elsewhere.
Step 2: Assign to Broker B
LoadManager assigns topic to Broker B (cluster decides which broker gets it based on load)
Write to ETCD:
β’ /cluster/brokers/10421046117770015389/default/reliable_topic: null
β
Broker B watcher sees PUT event
β
TopicManager.ensure_local("/default/reliable_topic")
β
WalFactory.for_topic("default/reliable_topic")
β
get_or_create_wal("default/reliable_topic")
β
ββ> β
NEW: Check sealed state
β EtcdMetadata.get_storage_state_sealed("default/reliable_topic")
β Returns: {sealed: true, last_committed_offset: 21, ...}
β
ββ> β
NEW: Calculate initial offset
β initial_offset = 21 + 1 = 22
β
ββ> β
NEW: Create CheckpointStore (empty on new broker)
β wal.ckpt: None
β uploader.ckpt: None
β
ββ> β
NEW: Create WAL with initial offset
Wal::with_config_with_store(cfg, ckpt_store, Some(22))
ββ> next_offset: AtomicU64::new(22) β Continues from 22!
β
Start Uploader
ββ> Check checkpoint: None (new broker)
ββ> β
Create initial checkpoint from sealed state
β uploader_checkpoint: {last_committed_offset: 21, ...}
ββ> Ready to upload from offset 22+
Broker B Initial State:
WAL:
next_offset: 22 β
Continues where A left off
cache: [] (empty initially)
Uploader:
checkpoint: 21 β
From sealed state
Subscription "subs_reliable":
Not loaded yet (no consumer connected)
What just happened?
This is the critical fix that prevents offset collisions:
- Sealed State Discovery: When Broker B loads the topic, it checks ETCD for a sealed state marker
- Offset Calculation: Finding
last_committed_offset: 21, it calculatesinitial_offset = 22 - WAL Initialization: The WAL is created with
next_offset = 22instead of the default0 - Uploader Restoration: The uploader checkpoint is set to 21, so it knows messages 0-21 are already in cloud storage
Step 3: Producer Reconnects and Sends Messages
Producer automatic reconnection: The Danube client detects the topic has moved and automatically reconnects to Broker B
Producer connects to Broker B
β
Sends 6 new messages (message IDs 2-7 in producer)
β
Broker B assigns offsets:
WAL.append(msg_2) β offset 22 β
WAL.append(msg_3) β offset 23 β
WAL.append(msg_4) β offset 24 β
WAL.append(msg_5) β offset 25 β
WAL.append(msg_6) β offset 26 β
WAL.append(msg_7) β offset 27 β
β
Messages stored:
ββ> WAL cache: [22, 23, 24, 25, 26, 27]
ββ> WAL file: ./danube-data/wal/default/reliable_topic/wal.log
ββ> Broadcast: (for live consumers)
β
Uploader runs (30s later):
ββ> Stream offsets 22-27 from WAL files
ββ> Upload to cloud: data-22-27.dnb1
ββ> Write to ETCD:
β’ objects/00000000000000000022:
{start_offset: 22, end_offset: 27, ...}
State After Production:
Cloud Storage:
data-0-21.dnb1 (from Broker A)
data-22-27.dnb1 (from Broker B) β
Continuous!
ETCD Objects:
/storage/.../objects/00000000000000000000: {start: 0, end: 21}
/storage/.../objects/00000000000000000022: {start: 22, end: 27} β
WAL on Broker B:
next_offset: 28
cache: [22, 23, 24, 25, 26, 27]
What just happened?
The producer has no awareness that the topic movedβfrom its perspective, it's just producing messages as normal:
- Client Routing: The Danube client queries ETCD for the topic's current broker assignment and connects to Broker B
- Continuous Offsets: Messages are assigned offsets 22-27, which is exactly what we wantβa continuation from Broker A's 0-21
- Cloud Upload: After 30 seconds (or when the segment is large enough), the uploader streams offsets 22-27 to cloud storage
The offset space is continuous across both brokers: [0..21] (Broker A) + [22..27] (Broker B). This is the foundation for consumers to work correctly.
Step 4: Consumer Reconnects
Consumer automatic reconnection: The Danube client detects the topic has moved and reconnects to Broker B with the same subscription
Consumer subscribes to "subs_reliable"
β
SubscriptionEngine.init_stream_from_progress_or_latest()
ββ> Read cursor from ETCD: 13
ββ> start_offset = 14 (next unacked message)
β
WalStorage.create_reader(start_offset=14)
β
ββ> Get WAL checkpoint on Broker B:
β wal_start_offset = 22 (WAL only has 22+)
β
ββ> Compare: 14 < 22 (requested offset is OLDER than WAL)
β
ββ> β
Tiered Reading Strategy:
β
ββ> Step 1: Read from CLOUD (14-21)
β CloudReader.read_range(14, 21)
β ββ> Query ETCD: get objects covering 14-21
β β Returns: objects/00000000000000000000 (0-21)
β ββ> Download: data-0-21.dnb1
β ββ> Extract frames: offsets 14-21
β ββ> Stream: [msg_14, msg_15, ..., msg_21]
β
ββ> Step 2: Chain to WAL (22+)
WAL.tail_reader(22, false)
ββ> Read from cache: [22, 23, 24, 25, 26, 27]
ββ> Stream: [msg_22, msg_23, ..., msg_27]
ββ> Then switch to live broadcast for future messages
β
Consumer receives continuous stream:
[14, 15, 16, 17, 18, 19, 20, 21] β from cloud
[22, 23, 24, 25, 26, 27] β from WAL
β
No gaps, no duplicates!
What just happened?
This is where the magic of tiered reading shines:
- Cursor Resume: The consumer's subscription cursor was at 13 (last ACKed message), so it resumes from offset 14
- Storage Decision: The consumer asks for offset 14, but Broker B's WAL only has offsets 22+. The system detects this gap.
- Cloud Fallback: For offsets 14-21, the reader automatically fetches data from cloud storage (uploaded by Broker A)
- WAL Handoff: Once caught up to offset 22, the reader seamlessly switches to reading from Broker B's local WAL
- Continuous Stream: From the consumer's perspective, it's a single continuous stream of messages, it has no idea the topic moved!
This works only because offsets are continuous: there's no collision between Broker A's offsets (0-21) and Broker B's offsets (22+).
Step 5: Consumer ACKs and Cursor Updates
Consumer processes and ACKs messages:
ACK(14) β ACK(15) β ... β ACK(27)
β
SubscriptionEngine.on_acked(offset)
ββ> Update in-memory cursor: 27
ββ> Batch flush to ETCD (every 1000 acks or 5s):
β’ /topics/.../subscriptions/subs_reliable/cursor: 27
Key Mechanisms Ensuring Continuity
1. Offset Continuity via Sealed State
2. Consumer Read Strategy (Tiered)
Consumer wants offset X
β
Is X in WAL range?
YES β Read from WAL only
NO β Read from Cloud, then chain to WAL
3. Uploader State Preservation
Old Broker:
Uploader checkpoint: 21
β (written to sealed state)
New Broker:
Uploader checkpoint: 21 (from sealed state)
Won't re-upload 0-21 β
Will upload 22+ β
4. Subscription Cursor Independence
Subscription cursor in ETCD: 13
β (persisted independently)
Consumer reads from 14 regardless of which broker
β
Works because offset space is continuous
Complete Timeline Example
| Time | Event | Broker A | Broker B | Cloud | Consumer |
|---|---|---|---|---|---|
| T1 | Produce msgs | Offsets 0-21 | - | - | - |
| T2 | Upload | - | - | 0-21 stored | - |
| T3 | Consumer reads | - | - | - | Reads 0-13, cursor=13 |
| T4 | UNLOAD | Seal state (21) | - | State saved | - |
| T5 | Assign to B | - | Load, WAL=22 β | - | - |
| T6 | Produce msgs | - | Offsets 22-27 β | - | - |
| T7 | Upload | - | - | 22-27 stored | - |
| T8 | Consumer reconnect | - | - | - | Reads 14-21 (cloud) |
| T9 | Continue read | - | - | - | Reads 22-27 (WAL) β |
| T10 | ACK all | - | - | - | Cursor=27 β |
Conclusion
The reliable topic workflow is designed to be broker-agnostic: a topic can be moved between brokers without any impact on producers or consumers, as long as offset continuity is preserved.
The sealed state mechanism is the linchpin that enables this:
- Old broker captures
last_committed_offsetwhen unloading - New broker reads this state and initializes its WAL from
last_committed_offset + 1 - Offset space remains continuous across the move
- Consumers read seamlessly from cloud (old messages) and WAL (new messages)
The architecture is built to handle topic mobility at scale, making Danube suitable for cloud-native deployments where resources are constantly shifting.