Sink Connector Development Guide
Build sink connectors to export data from Danube to any external system
Overview
A sink connector exports data from Danube topics to external systems. The connector consumes typed SinkRecord values, transforms them, and writes them to the target system. The runtime owns Danube consumption, buffering, batch flush timing, schema-aware deserialization, acknowledgments, health scheduling, and metrics export.
Examples: DanubeβDelta Lake, DanubeβClickHouse, DanubeβVector DB, DanubeβHTTP API
Core Concepts
Division of Responsibilities
| You Handle | Runtime Handles |
|---|---|
| Connect to external system | Connect to Danube broker |
Transform SinkRecord data |
Consume from Danube topics |
| Write to target system | Schema deserialization & expected-subject checks |
| Connector-specific routing/grouping | Message buffering, batching, and acknowledgments |
| Error classification for target failures | Lifecycle, health scheduling, and retry policy |
| External-system cleanup | Metrics export and observability |
Key insight: You receive typed serde_json::Value data already deserialized by the runtime. No manual schema operations needed.
SinkConnector Trait
#[async_trait]
pub trait SinkConnector: Send + Sync {
async fn initialize(&mut self, config: ConnectorConfig) -> ConnectorResult<()>;
async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>>;
async fn process_batch(&mut self, records: Vec<SinkRecord>) -> ConnectorResult<()>;
async fn shutdown(&mut self) -> ConnectorResult<()> { Ok(()) }
async fn health_check(&self) -> ConnectorResult<()> { Ok(()) }
}
Required: initialize, consumer_configs, process_batch
Optional: shutdown, health_check
Configuration Architecture
Unified Configuration Pattern
Use a single configuration struct combining core Danube settings with connector-specific settings:
#[derive(Deserialize)]
pub struct MySinkConfig {
#[serde(flatten)]
pub core: ConnectorConfig, // Danube settings (no schemas for sinks!)
pub my_sink: MySettings, // Connector-specific
}
Critical for Sink Connectors:
- β
Sink connectors typically leave
config.core.schemasempty - β Runtime deserializes payloads before your connector sees them
- β
Per-route
expected_schema_subjectcan enforce an expected contract at consume time - β
Schema info remains available through
record.schema()andrecord.context()
TOML Structure
# At root level (flattened from ConnectorConfig)
danube_service_url = "http://danube-broker:6650"
connector_name = "my-sink"
[processing]
batch_size = 500
batch_timeout_ms = 1000
metrics_port = 9090
# Connector-specific section
[my_sink]
target_url = "http://database.example.com"
connection_pool_size = 10
# Routes with subscription details
[[my_sink.routes]]
from = "/events/users"
subscription = "sink-group-1"
subscription_type = "Shared"
to = "users"
expected_schema_subject = "user-events-v1" # Optional: verify schema
[[my_sink.routes]]
from = "/events/orders"
subscription = "sink-group-1"
subscription_type = "Shared"
to = "orders"
Configuration Loading
Mechanism:
- Load TOML file from
CONNECTOR_CONFIG_PATHenvironment variable - Apply environment variable overrides (secrets only)
- Validate configuration
- Pass to connector and runtime
Environment overrides:
- Core:
DANUBE_SERVICE_URL,CONNECTOR_NAME - Secrets: Database passwords, API keys, credentials
Not overridable: routes, from, to, subscription settings, and shared processing behavior (must stay in TOML)
Implementation Steps
1. Initialize Connection
async fn initialize(&mut self, config: ConnectorConfig)
Purpose: Establish connection to external system and prepare for writing.
What to do:
- Create client/connection to target system
- Configure authentication (credentials, API tokens, certificates)
- Set up connection pools for parallel writes
- Validate write permissions with test write/query
- Create tables/collections/indexes if needed
- Store client in struct for later use in
process_batch()
Error handling:
- Return
ConnectorError::configfor configuration mistakes - Return
ConnectorError::fatalfor connection or setup failures - Log detailed error context
- Fail fast - don't retry in
initialize()
Examples:
- Database: Create connection pool, validate table schema, test insert
- HTTP API: Set up client with auth headers, test endpoint availability
- Object Storage: Configure S3/GCS client, verify bucket access
- Search Engine: Connect, create index if needed, validate mappings
2. Define Consumer Configs
async fn consumer_configs(&self) -> ConnectorResult<Vec<ConsumerConfig>>
Purpose: Tell the runtime which Danube topics to consume from and how.
What to do:
- Map your topic configuration to Danube consumer configs
- For each consumed topic or route, create a
ConsumerConfig - Specify subscription type (Shared, Exclusive, FailOver)
- Set consumer name and subscription name
- Optionally specify expected schema subject for validation
Consumer config structure:
ConsumerConfig {
topic: "/events/users",
consumer_name: "my-sink-1",
subscription: "my-sink-group",
subscription_type: SubscriptionType::Shared,
expected_schema_subject: Some("user-events-v1"), // Optional
}
Subscription types:
- Shared: Load balancing across multiple consumers (most common)
- Exclusive: Single consumer, ordered processing
- FailOver: Active-passive, automatic failover
Called once at startup after initialize(). Runtime creates consumers based on these configs.
3. Process Records
async fn process_batch(&mut self, records: Vec<SinkRecord>)
Purpose: Transform a runtime-managed batch and write it to the external system.
Called: When runtime buffering reaches the configured batch size or batch timeout.
What to do:
- Inspect the batch - Records are already deserialized by the runtime
- Group if needed - Group by route, table, collection, or target entity
- Transform - Convert records into the external system's write model
- Bulk write - Execute the connector's external write path
- Handle partials intentionally - Skip bad records inside the batch if that is your policy
- Return appropriate error type - Retryable only for truly transient batch failures
SinkRecord API:
let payload = record.payload(); // &serde_json::Value
let topic = record.topic(); // &str
let timestamp = record.publish_time(); // u64 (microseconds)
let attributes = record.attributes(); // &HashMap<String, String>
let context = record.context(); // generic routing + schema context
// Check schema info if message has schema
if let Some(schema) = record.schema() {
// schema.subject, schema.version, schema.schema_type
}
// Deserialize to struct (type-safe)
let event: MyStruct = record.as_type()?;
Runtime provides:
- Pre-deserialized JSON data
- Expected-schema checks when configured
- Type-safe access methods
- Logical topic metadata and generic record context
4. Runtime-Managed Batching
The sink runtime manages buffering and flush timing using the shared root [processing] settings.
Controls:
processing.batch_sizeprocessing.batch_timeout_ms
What this means for connector authors:
- Do not add connector-owned batch size or flush timer settings unless the external platform truly requires a second, platform-specific layer
- Treat
process_batch()as the primary sink write path - If your connector supports multiple routes, group the incoming batch by
record.topic()or by route target insideprocess_batch() - If you need lower latency or higher throughput, tune the shared
[processing]block rather than inventing connector-local batching knobs
Example:
5. Error Handling
Error types guide runtime behavior:
| Error Type | When to Use | Runtime Action |
|---|---|---|
Retryable |
Temporary failures (rate limit, connection) | Retry with backoff |
Fatal |
Permanent failures (auth, bad config) | Shutdown connector |
InvalidData |
Batch content is malformed or unusable | Non-retryable failure |
Ok(()) |
Batch succeeded | Acknowledge message |
Important: The runtime only retries Retryable errors. If you return InvalidData, that batch is treated as a non-retryable failure.
Recommended pattern:
- Handle per-record skippable bad data inside
process_batch()when possible - Return
Retryableonly when the whole batch should be retried - Return
Fatalwhen continuing would be unsafe or pointless
Partial batch failures: For systems that support partial success, handle the split inside the connector and only return an error for the portion that truly failed your chosen policy.
6. Graceful Shutdown
async fn shutdown(&mut self)
Purpose: Clean up resources and flush pending data before stopping.
For most connectors, this means finishing in-flight external work and closing clients cleanly. The runtime already owns Danube-side buffering.
What to do:
- Complete any in-flight external writes
- Close connections to external system
- Save any state if needed
- Log shutdown completion
Runtime guarantees:
shutdown()called on SIGTERM/SIGINT- No new messages delivered after shutdown starts
- Time to complete graceful shutdown
Schema Handling in Sink Connectors
How Schemas Work
For sink connectors, schemas are handled differently than sources:
- Producer side (source connector or app) validates and attaches schema
- Message stored in Danube with schema ID
- Runtime fetches schema from registry when consuming
- Runtime deserializes payload based on schema type
- You receive validated, typed
serde_json::Value
You never:
- β Validate schemas (producer already did)
- β Deserialize raw bytes (runtime already did)
- β Fetch schemas from registry (runtime caches them)
- β Configure schemas in
ConnectorConfig(no[[schemas]]section)
You can:
- β
Access schema metadata via
record.schema() - β
Specify
expected_schema_subjectfor validation - β Use different logic based on schema version
- β Access pre-validated typed data
Expected Schema Subject
Optional verification that messages have expected schema:
[[my_sink.routes]]
from = "/events/users"
expected_schema_subject = "user-events-v1" # Verify schema subject
What happens:
- Runtime checks if message schema subject matches
- Mismatch results in a non-retryable consume-side failure for that batch
- Useful for ensuring data contracts
When to use:
- β Strict data contracts required
- β Multiple producers on same topic
- β Critical data pipelines
When to skip:
- Schema evolution in progress
- Flexible data formats
- Mixed content topics
Subscription Types
Shared (Most Common)
Behavior: Messages load-balanced across all active consumers
Use cases:
- High-volume parallel processing
- Horizontal scaling
- Order doesn't matter within topic
Example: Processing events where each event is independent
Exclusive
Behavior: Only one consumer receives messages, strictly ordered
Use cases:
- Order-dependent processing
- Sequential operations
- Single-threaded requirements
Example: Bank transactions that must be processed in order
Failover
Behavior: Active-passive setup with automatic failover
Use cases:
- Ordered processing with high availability
- Standby consumer for reliability
- Graceful failover
Example: Critical pipeline with backup consumer
Multi-Topic Routing
Pattern
Each route is independent:
- Different target entities (tables, collections, indexes)
- Different transformations
- Different schema expectations
Mechanism
- Store topic β config mapping in connector
- Lookup configuration for each record's topic
- Apply topic-specific logic for transformation
- Route to target entity (table, collection, etc.)
Configuration Example
[[my_sink.routes]]
from = "/events/clicks"
subscription = "analytics-clicks"
to = "clicks"
[[my_sink.routes]]
from = "/events/orders"
subscription = "analytics-orders"
to = "orders"
Main Entry Point
#[tokio::main]
async fn main() -> ConnectorResult<()> {
// 1. Initialize logging
tracing_subscriber::fmt::init();
// 2. Load configuration
let config = MySinkConfig::load()?;
// 3. Create connector with settings
let connector = MySinkConnector::new(config.my_sink)?;
// 4. Create and run runtime (handles everything else)
let mut runtime = SinkRuntime::new(connector, config.core).await?;
runtime.run().await
}
Runtime handles:
- Danube connection
- Consumer creation with subscription configs
- Message consumption and buffering
- Schema fetching and deserialization
- Calling
process_batch() - Retries and error handling
- Lifecycle & monitoring
- Metrics & health
Best Practices
Configuration
- β
Use flattened
ConnectorConfigat the root - β Keep secrets in environment variables, not TOML
- β Validate configuration at startup
- β
Keep route structure consistent with
routes,from, andto
Data Processing
- β
Use
process_batch()for performance (10-100x faster) - β Group records by target entity in multi-topic scenarios
- β Make metadata addition optional (configurable)
- β Handle missing fields gracefully
Runtime Settings
- β
Tune
[processing].batch_sizeand[processing].batch_timeout_ms - β
Use
metrics_portand health-check settings from shared config - β Avoid duplicate connector-local batching knobs unless platform-specific
- β Document shared runtime tuning instead of connector-owned flush policy
Error Handling
- β
Retryablefor temporary failures (rate limits, network) - β
Fatalfor permanent failures (auth, config) - β
Return
Ok(())only after the batch succeeded or any skippable records were handled inside the connector - β Log detailed error context
Subscription Types
- β
Use
Sharedfor scalable parallel processing (default) - β
Use
Exclusiveonly when ordering is critical - β
Use
FailOverfor ordered + high availability - β Document ordering requirements
Performance
- β Use connection pooling for concurrent writes
- β Batch operations whenever possible
- β Configure appropriate timeouts
- β Monitor backpressure and lag
Common Patterns
Direct Write
Receive data, transform, write directly. Simplest approach for low-volume or low-latency requirements.
Buffered Batch Write
Let the runtime buffer records until batch size/timeout is reached, then bulk write in process_batch(). Best for throughput.
Grouped Multi-Topic Write
Group records by target entity, write each group as batch. Efficient for multi-topic routing.
Enriched Write
Add metadata from Danube (topic, publish time, producer, attributes) to records before writing. Useful for auditing.
Upsert Pattern
Use record attributes or payload fields to determine insert vs. update. Common for CDC scenarios.
Testing
Local Development
- Start Danube:
docker-compose up -d - Set config:
export CONNECTOR_CONFIG_PATH=./config/connector.toml - Run:
cargo run - Produce:
danube-cli produce --topic /events/test --message '{"test":"data"}' - Verify: Check target system for data
Unit Tests
- Test transformation logic with sample
SinkRecords - Test topic routing and configuration lookup
- Mock external system for isolated testing
- Validate error handling paths
Integration Tests
- Full stack: Danube + Connector + Target system
- Produce messages with schemas
- Verify data in target system
- Test batching behavior
- Test error scenarios
Summary
You implement:
SinkConnectortrait (3 required, 2 optional methods)- Configuration with flattened
ConnectorConfig - External system integration (connection and writes)
- Message transformation from
SinkRecordto target format - Topic routing and multi-topic handling
Runtime handles:
- Danube connection & consumption
- Schema fetching & deserialization
- Message buffering, batching, and delivery
- Calling your
process_batchmethod - Retries & error handling
- Lifecycle & monitoring
- Metrics & health
Remember:
- Configuration uses flattened
ConnectorConfigplus connector-specificroutes - Schemas are handled by the runtime - you receive typed data
expected_schema_subjectis optional verification, not validation- Use
process_batch()for best performance - Focus on external system integration and data transformation