Message Structure
Messages are the fundamental unit of data in Danube. Each message contains a payload plus rich metadata for routing, tracking, acknowledgment, and schema validation.
StreamMessage Structure
pub struct StreamMessage {
pub request_id: u64,
pub msg_id: MessageID,
pub payload: Vec<u8>,
pub publish_time: u64,
pub producer_name: String,
pub subscription_name: Option<String>,
pub attributes: HashMap<String, String>,
pub schema_id: Option<u64>,
pub schema_version: Option<u32>,
}
Field Reference
| Field | Type | Description |
|---|---|---|
request_id |
u64 |
Unique identifier for tracking the message request across the system |
msg_id |
MessageID |
Composite identifier containing routing and location information |
payload |
Vec<u8> |
The actual message content in binary format |
publish_time |
u64 |
Unix timestamp (milliseconds) when the message was published |
producer_name |
String |
Name of the producer that sent this message |
subscription_name |
Option<String> |
Name of the subscription (set by broker for consumer delivery) |
attributes |
HashMap<String, String> |
User-defined key-value pairs for custom metadata |
schema_id |
Option<u64> |
Schema Registry ID for message validation (see Schema Integration) |
schema_version |
Option<u32> |
Version of the schema used to serialize this message |
MessageID Structure
The MessageID is a composite identifier that enables efficient routing and acknowledgment:
pub struct MessageID {
pub producer_id: u64,
pub topic_name: String,
pub broker_addr: String,
pub topic_offset: u64,
}
Field Reference
| Field | Type | Description |
|---|---|---|
producer_id |
u64 |
Unique identifier for the producer within this topic |
topic_name |
String |
Full topic name (e.g., /default/events) |
broker_addr |
String |
Address of the broker that delivered this message |
topic_offset |
u64 |
Monotonic position of the message within the topic |
Purpose:
- Routing: Broker address enables consumers to send acknowledgments to the correct broker
- Ordering: Topic offset provides strict ordering guarantees within a topic
- Deduplication: Combination of producer_id + topic_offset creates a unique message identifier
- Tracking: Request ID links messages across distributed tracing systems
Schema Integration
Danube integrates with the Schema Registry to provide type-safe messaging with schema validation.
Schema Fields in StreamMessage
schema_id: Option<u64>
Globally unique identifier assigned by the Schema Registry when a schema is registered.
- Present when producer uses schema-validated messages
- Consumers use this ID to fetch the schema from the registry
- Enables schema caching (8-byte overhead vs. sending full schema per message)
- Required when topic has validation policy set to
Enforce
schema_version: Option<u32>
Version number of the schema within its subject.
- Tracks which version of the schema was used to serialize this message
- Enables schema evolution tracking and debugging
- Allows consumers to handle multiple schema versions gracefully
- Starts at 1 and auto-increments with each schema update
How Schema Validation Works
1. Producer Flow
βββββββββββββββ
β Producer β
β β
β 1. Register β ββββββββββββββββββββ
β Schema βββββ>β Schema Registry β
β β β β
β 2. Get ID β<βββββ€ Returns: β
β β β - schema_id: 42 β
β β β - version: 1 β
β 3. Serializeβ ββββββββββββββββββββ
β Message β
β β
β 4. Set β StreamMessage {
β Fields β schema_id: Some(42),
β β schema_version: Some(1),
β β payload: <serialized>,
β 5. Send β ...
β β }
ββββββββ¬βββββββ
β
v
βββββββββββ
β Broker β
βββββββββββ
2. Broker Validation
When a message arrives, the broker validates schema fields based on the topic's validation policy:
| Policy | Behavior |
|---|---|
| None | No validation; schema fields optional |
| Warn | Logs warning if schema_id missing or invalid; allows message |
| Enforce | Rejects message if schema_id missing, invalid, or doesn't match topic requirements |
3. Consumer Flow
ββββββββββββββββ
β Consumer β
β β
β 1. Receive β StreamMessage {
β Message β schema_id: Some(42),
β β schema_version: Some(1),
β 2. Extract β payload: <bytes>,
β schema_id β ...
β β }
β 3. Fetch β
β Schema βββββ>ββββββββββββββββββββ
β β β Schema Registry β
β 4. Get Def β<βββββ€ Returns schema β
β β β definition β
β 5. Deserializeβ ββββββββββββββββββββ
β Payload β
β β
β 6. Validate β { user_id: 123, action: "login" }
β Struct β
β β
ββββββββββββββββ
Benefits of Schema Integration
β
Type Safety
Messages are validated against a contract, preventing data corruption.
β
Bandwidth Efficiency
Only 8-12 bytes overhead (schema_id + version) instead of full schema (potentially kilobytes).
β
Schema Evolution
Consumers can handle multiple schema versions using the schema_version field.
β
Debugging
Knowing which schema version produced a message simplifies troubleshooting.
β
Governance
Centralized schema management with compatibility checking and audit trails.