Danube Connect Architecture
High-level design and core concepts
Design Philosophy
Isolation & Safety
The Danube broker remains pure Rust with zero knowledge of external systems. Connectors run as separate processes, ensuring that a failing connector never crashes the broker.
Key principle: Process-level boundaries guarantee fault isolation, a misconfigured PostgreSQL connector cannot impact message delivery for other topics or tenants.
Shared Core Library
The danube-connect-core SDK eliminates boilerplate by handling the generic runtime concerns that every connector shares.
Connector developers implement simple traits:
SourceConnector- Import data into DanubeSinkConnector- Export data from Danube
Core SDK handles:
- Danube client connection management
- configuration loading helpers and shared root config
- sink batching, batch flush timing, and acknowledgments
- source polling loops or streaming envelope drain loops
- schema-aware serialization and deserialization
- automatic retry with exponential backoff
- Prometheus metrics export and health checks
- Signal handling and graceful shutdown
Architecture Layers
External Systems (MQTT, databases, APIs)
↓
Your Connector Logic (implement trait)
↓
danube-connect-core (SDK framework)
↓
danube-client (gRPC communication)
↓
Danube Broker Cluster
Separation of concerns:
- Connector responsibility: External system integration and data transformation
- SDK responsibility: Danube communication, lifecycle, retries, metrics
Connector Types
Source Connectors (External → Danube)
Poll or subscribe to external systems and publish messages to Danube topics.
Source connectors can now run in two modes:
Polling- runtime callspoll()on an intervalStreaming- connector pushesSourceEnvelopevalues to the runtime withSourceSender
Key responsibilities:
- Connect to external system
- Read or receive external events
- Transform external format into
SourceRecordorSourceEnvelope - Route to appropriate Danube topics
- Optionally manage offsets/checkpoints through
Offset
Sink Connectors (Danube → External)
Consume messages from Danube topics and write to external systems.
Key responsibilities:
- Subscribe to Danube topics
- Transform Danube messages to external format
- Perform connector-specific validation and routing
- Execute bulk writes to the external system from
process_batch()
The danube-connect-core SDK
Core Traits
Connectors implement the danube-connect-core traits:
SourceConnector
#[async_trait]
pub trait SourceConnector {
async fn initialize(&mut self, config: ConnectorConfig) -> Result<()>;
async fn producer_configs(&self) -> Result<Vec<ProducerConfig>>;
fn mode(&self) -> SourceConnectorMode {
SourceConnectorMode::Polling
}
async fn start_streaming(&mut self, sender: SourceSender) -> Result<()>;
async fn poll(&mut self) -> Result<Vec<SourceEnvelope>>;
async fn commit(&mut self, offsets: Vec<Offset>) -> Result<()>;
async fn shutdown(&mut self) -> Result<()>;
async fn health_check(&self) -> Result<()>;
}
SinkConnector
#[async_trait]
pub trait SinkConnector {
async fn initialize(&mut self, config: ConnectorConfig) -> Result<()>;
async fn consumer_configs(&self) -> Result<Vec<ConsumerConfig>>;
async fn process_batch(&mut self, records: Vec<SinkRecord>) -> Result<()>;
async fn shutdown(&mut self) -> Result<()>;
async fn health_check(&self) -> Result<()>;
}
Message Types
SinkRecord - Message consumed from Danube (Danube → External System)
The runtime handles schema-aware deserialization automatically. Your connector receives typed data as serde_json::Value, already deserialized based on the message's schema.
API:
payload()- Returns&serde_json::Value(typed data, not raw bytes)as_type<T>()- Deserialize payload to specific Rust typeschema()- Get schema information (subject, version, type) if message has schemaattributes()- Metadata key-value pairs from producertopic(),publish_time(),producer_name()- Message metadatarouting_context()/context()- Generic routing and record context helpers
Example:
async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
for record in records {
// Access as typed data (already deserialized by runtime)
let payload = record.payload();
let user_id = payload["user_id"].as_str().unwrap();
// Or deserialize to struct
let event: MyEvent = record.as_type()?;
// Access generic routing context
let context = record.context();
println!("topic={} producer={:?}", context.topic(), context.producer_name());
if let Some(schema) = record.schema() {
println!("Schema: {} v{}", schema.subject, schema.version);
}
let _ = (user_id, event);
}
Ok(())
}
SourceRecord - Message to publish to Danube (External System → Danube)
You provide typed data as serde_json::Value. The runtime handles schema-aware serialization before sending to Danube.
API:
new(topic, payload)- Create fromserde_json::Valuefrom_json(topic, data)- Create from any JSON-serializable typefrom_string(topic, text)- Create from stringfrom_number(topic, value)- Create from numeric payloadsfrom_bytes(topic, bytes)- Create from binary payloadswith_attribute(key, value)- Add metadata attributewith_attributes(map)- Add multiple attributeswith_key(key)- Set routing key for partitioningpayload()/context()- Access payload and generic routing context
Example:
use serde_json::json;
use danube_connect_core::{Offset, SourceEnvelope};
// From JSON value
let record = SourceRecord::new("/events/users", json!({
"user_id": "123",
"action": "login"
})).with_attribute("source", "api");
// From struct (automatically serialized to JSON)
#[derive(Serialize)]
struct Event { user_id: String, action: String }
let event = Event { user_id: "123".into(), action: "login".into() };
let record = SourceRecord::from_json("/events/users", &event)?
.with_attribute("source", "api")
.with_key(&event.user_id); // For partitioning
// Attach an offset/checkpoint when needed
let envelope = SourceEnvelope::with_offset(record, Offset::new("api-page", 42));
Runtime Management
The SDK provides SourceRuntime and SinkRuntime that handle:
- Lifecycle - Initialize connector, create Danube clients, start message loops
- Message processing - Polling or streaming for sources, runtime-managed batching for sinks
- Error handling - Automatic retry with exponential backoff
- Schema handling - Serialize source payloads and deserialize sink payloads
- Acknowledgments - Ack successfully processed sink messages and commit source offsets
- Shutdown - Graceful cleanup on SIGTERM/SIGINT
- Observability - Expose Prometheus metrics and health endpoints
Multi-Topic Support
A single connector can handle multiple topics with different configurations.
Source connector example:
- One MQTT connector routes
sensors/#→/iot/sensors(8 partitions, reliable) - Same connector routes
debug/#→/iot/debug(1 partition, non-reliable)
Sink connector example:
- One ClickHouse connector consumes from
/analytics/eventsand/analytics/metrics - Writes both to different tables in the same database
Schema Registry Integration
Critical feature: The runtime automatically handles schema-aware serialization and deserialization, eliminating the need for connectors to manage schema logic.
How It Works
For Sink Connectors (Consuming):
- Message arrives with schema ID
- Runtime fetches schema from registry (cached for performance)
- Runtime deserializes payload based on schema type (JSON, String, Bytes, Avro, Protobuf)
- Your connector receives
SinkRecordwith typedserde_json::Valuepayload - You process the typed data without worrying about raw bytes or schemas
For Source Connectors (Publishing):
- You create
SourceRecordwithserde_json::Valuepayload - Runtime serializes payload based on configured schema type
- Runtime registers/validates schema with registry
- Message is sent to Danube with schema ID attached
- Consumers automatically deserialize using the schema
Schema Configuration
Define schemas per topic in your connector configuration:
# Core Danube settings
danube_service_url = "http://danube-broker:6650"
connector_name = "my-connector"
# Schema mappings for topics
[[schemas]]
topic = "/events/users"
subject = "user-events"
schema_type = "json_schema"
schema_file = "schemas/user-event.json"
version_strategy = "latest" # or "pinned" or "minimum"
[[schemas]]
topic = "/iot/sensors"
subject = "sensor-data"
schema_type = "json_schema"
schema_file = "schemas/sensor.json"
version_strategy = { pinned = 2 } # Pin to version 2
[[schemas]]
topic = "/raw/telemetry"
subject = "telemetry"
schema_type = "bytes" # Binary data
Supported Schema Types
| Type | Description | Serialization | Use Case |
|---|---|---|---|
json_schema |
JSON Schema | JSON | Structured events, APIs |
string |
UTF-8 text | String bytes | Logs, text messages |
bytes |
Raw binary | Base64 in JSON | Binary data, images |
number |
Numeric data | JSON number | Metrics, counters |
avro |
Apache Avro | JSON payload with Avro validation | High-performance schemas |
protobuf |
Protocol Buffers | Planned | gRPC, microservices |
Note: JSON Schema, String, Bytes, Number, and Avro flows are supported in the current runtime model. Protobuf serialization/deserialization is not implemented yet.
Version Strategies
Control schema evolution with version strategies:
# Latest version (default)
version_strategy = "latest"
# Pin to specific version
version_strategy = { pinned = 3 }
# Minimum version (use >= version)
version_strategy = { minimum = 2 }
Connector Benefits
What you DON'T need to do:
- ❌ Fetch schemas from registry
- ❌ Deserialize/serialize payloads manually
- ❌ Handle schema versions
- ❌ Manage schema caching
- ❌ Deal with raw bytes
What you DO:
- ✅ Work with typed
serde_json::Valuedata - ✅ Focus on business logic
- ✅ Let the runtime handle all schema operations
Example: Schema-Aware Sink
async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()> {
for record in records {
let payload = record.payload();
if let Some(schema) = record.schema() {
info!("Processing message with schema: {} v{}",
schema.subject, schema.version);
}
let user_id = payload["user_id"].as_str()
.ok_or_else(|| ConnectorError::invalid_data("Missing user_id", vec![]))?;
#[derive(Deserialize)]
struct UserEvent {
user_id: String,
action: String,
timestamp: u64,
}
let event: UserEvent = record.as_type()?;
self.database.insert_user_event(event).await?;
let _ = user_id;
}
Ok(())
}
Example: Schema-Aware Source
async fn poll(&mut self) -> ConnectorResult<Vec<SourceEnvelope>> {
let external_data = self.client.fetch_events().await?;
let records: Vec<SourceEnvelope> = external_data
.into_iter()
.map(|event| {
SourceRecord::from_json("/events/users", &event)
.unwrap()
.with_attribute("source", "external-api")
.into()
})
.collect();
Ok(records)
}
The runtime automatically:
- Serializes each record's payload based on
/events/userstopic's configured schema - Registers schema with registry if needed
- Attaches schema ID to messages
- Handles schema validation errors
Configuration Pattern
Connectors use a single configuration file combining core Danube settings with connector-specific settings:
# Core Danube settings (provided by danube-connect-core)
danube_service_url = "http://danube-broker:6650"
connector_name = "mqtt-bridge"
[processing]
batch_size = 500
batch_timeout_ms = 1000
poll_interval_ms = 100
metrics_port = 9090
# Connector-specific settings
[mqtt]
broker_host = "mosquitto"
broker_port = 1883
[[mqtt.routes]]
from = "sensors/#"
to = "/iot/sensors"
partitions = 8
Environment variable overrides:
- Mandatory fields:
DANUBE_SERVICE_URL,CONNECTOR_NAME - Secrets and connection endpoints:
MQTT_PASSWORD,QDRANT_URL, API keys, etc. - Structural config such as
routes,from,to, subscriptions, and processing settings should stay in TOML
See: Configuration Guide for complete details
Deployment Architecture
Connectors run as standalone Docker containers alongside your Danube cluster:
┌─────────────────────────────┐
│ External Systems │
│ (MQTT, DBs, APIs) │
└──────────┬──────────────────┘
↓
┌──────────────────────────────┐
│ Connector Layer │
│ (Docker Containers) │
│ ┌────────┐ ┌────────┐ │
│ │ MQTT │ │ Delta │ ... │
│ │ Source │ │ Sink │ │
│ └────┬───┘ └───┬────┘ │
└───────┼──────────┼───────────┘
↓ ↓
┌───────────────────────────────┐
│ Danube Broker Cluster │
└───────────────────────────────┘
Scaling:
- Connectors scale independently from brokers
- Run multiple instances for high availability
- No impact on broker resources
Error Handling
The SDK categorizes errors into three types:
- Retryable - Temporary failures (network issues, rate limits) → automatic retry with exponential backoff
- Invalid Data - Malformed messages that can never be processed → log and skip
- Fatal - Unrecoverable errors → graceful shutdown, let orchestrator restart
Your connector: Return the appropriate error type, the SDK handles the rest
Observability
Every connector automatically exposes:
Prometheus metrics (http://connector:9090/metrics):
connector_messages_received/connector_messages_sentconnector_errors_totalconnector_processing_duration_seconds
Health checks (http://connector:9090/health):
healthy,degraded, orunhealthy
Structured logging:
- JSON-formatted logs with trace IDs
- Configurable log levels via
LOG_LEVELenv var
Next Steps
- Build Source Connector - How to build a source connector
- Build Sink Connector - How to build a sink connector
- GitHub Repository - Source code and examples
- danube-connect-core API - SDK documentation