Skip to content

Message Structure

Messages are the fundamental unit of data in Danube. Each message contains a payload plus rich metadata for routing, tracking, acknowledgment, and schema validation.


StreamMessage Structure

pub struct StreamMessage {
    pub request_id: u64,
    pub msg_id: MessageID,
    pub payload: Vec<u8>,
    pub publish_time: u64,
    pub producer_name: String,
    pub subscription_name: Option<String>,
    pub attributes: HashMap<String, String>,
    pub schema_id: Option<u64>,
    pub schema_version: Option<u32>,
}

Field Reference

Field Type Description
request_id u64 Unique identifier for tracking the message request across the system
msg_id MessageID Composite identifier containing routing and location information
payload Vec<u8> The actual message content in binary format
publish_time u64 Unix timestamp (milliseconds) when the message was published
producer_name String Name of the producer that sent this message
subscription_name Option<String> Name of the subscription (set by broker for consumer delivery)
attributes HashMap<String, String> User-defined key-value pairs for custom metadata
schema_id Option<u64> Schema Registry ID for message validation (see Schema Integration)
schema_version Option<u32> Version of the schema used to serialize this message

MessageID Structure

The MessageID is a composite identifier that enables efficient routing and acknowledgment:

pub struct MessageID {
    pub producer_id: u64,
    pub topic_name: String,
    pub broker_addr: String,
    pub topic_offset: u64,
}

Field Reference

Field Type Description
producer_id u64 Unique identifier for the producer within this topic
topic_name String Full topic name (e.g., /default/events)
broker_addr String Address of the broker that delivered this message
topic_offset u64 Monotonic position of the message within the topic

Purpose:

  • Routing: Broker address enables consumers to send acknowledgments to the correct broker
  • Ordering: Topic offset provides strict ordering guarantees within a topic
  • Deduplication: Combination of producer_id + topic_offset creates a unique message identifier
  • Tracking: Request ID links messages across distributed tracing systems

Schema Integration

Danube integrates with the Schema Registry to provide type-safe messaging with schema validation.

Schema Fields in StreamMessage

schema_id: Option<u64>

Globally unique identifier assigned by the Schema Registry when a schema is registered.

  • Present when producer uses schema-validated messages
  • Consumers use this ID to fetch the schema from the registry
  • Enables schema caching (8-byte overhead vs. sending full schema per message)
  • Required when topic has validation policy set to Enforce

schema_version: Option<u32>

Version number of the schema within its subject.

  • Tracks which version of the schema was used to serialize this message
  • Enables schema evolution tracking and debugging
  • Allows consumers to handle multiple schema versions gracefully
  • Starts at 1 and auto-increments with each schema update

How Schema Validation Works

1. Producer Flow

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Producer   β”‚
β”‚             β”‚
β”‚ 1. Register β”‚     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    Schema   β”œβ”€β”€β”€β”€>β”‚ Schema Registry  β”‚
β”‚             β”‚     β”‚                  β”‚
β”‚ 2. Get ID   β”‚<───── Returns:         β”‚
β”‚             β”‚     β”‚ - schema_id: 42  β”‚
β”‚             β”‚     β”‚ - version: 1     β”‚
β”‚ 3. Serializeβ”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚    Message  β”‚
β”‚             β”‚
β”‚ 4. Set      β”‚     StreamMessage {
β”‚    Fields   β”‚       schema_id: Some(42),
β”‚             β”‚       schema_version: Some(1),
β”‚             β”‚       payload: <serialized>,
β”‚ 5. Send     β”‚       ...
β”‚             β”‚     }
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       v
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Broker  β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Broker Validation

When a message arrives, the broker validates schema fields based on the topic's validation policy:

Policy Behavior
None No validation; schema fields optional
Warn Logs warning if schema_id missing or invalid; allows message
Enforce Rejects message if schema_id missing, invalid, or doesn't match topic requirements

3. Consumer Flow

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Consumer   β”‚
β”‚              β”‚
β”‚ 1. Receive   β”‚     StreamMessage {
β”‚    Message   β”‚       schema_id: Some(42),
β”‚              β”‚       schema_version: Some(1),
β”‚ 2. Extract   β”‚       payload: <bytes>,
β”‚    schema_id β”‚       ...
β”‚              β”‚     }
β”‚ 3. Fetch     β”‚
β”‚    Schema    β”œβ”€β”€β”€β”€>β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              β”‚     β”‚ Schema Registry  β”‚
β”‚ 4. Get Def   β”‚<───── Returns schema   β”‚
β”‚              β”‚     β”‚ definition       β”‚
β”‚ 5. Deserializeβ”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚    Payload   β”‚
β”‚              β”‚
β”‚ 6. Validate  β”‚     { user_id: 123, action: "login" }
β”‚    Struct    β”‚
β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Benefits of Schema Integration

βœ… Type Safety
Messages are validated against a contract, preventing data corruption.

βœ… Bandwidth Efficiency
Only 8-12 bytes overhead (schema_id + version) instead of full schema (potentially kilobytes).

βœ… Schema Evolution
Consumers can handle multiple schema versions using the schema_version field.

βœ… Debugging
Knowing which schema version produced a message simplifies troubleshooting.

βœ… Governance
Centralized schema management with compatibility checking and audit trails.