Producer Guide
Producers send messages to Danube topics. This guide covers both basic and advanced producer capabilities.
Creating a Producer
Simple Producer
The minimal setup to send messages:
use danube_client::DanubeClient;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = DanubeClient::builder()
.service_url("http://127.0.0.1:6650")
.build()
.await?;
let mut producer = client
.new_producer()
.with_topic("/default/my-topic")
.with_name("my-producer")
.build()?;
producer.create().await?;
println!("✅ Producer created");
Ok(())
}
import (
"context"
"fmt"
"log"
"github.com/danube-messaging/danube-go"
)
func main() {
client, err := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
ctx := context.Background()
producer, err := client.NewProducer().
WithName("my-producer").
WithTopic("/default/my-topic").
Build()
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
if err := producer.Create(ctx); err != nil {
log.Fatalf("Failed to initialize producer: %v", err)
}
fmt.Println("✅ Producer created")
}
Key concepts:
- Topic: Destination for messages (e.g.,
/default/my-topic) - Name: Unique producer identifier
- Create: Registers producer with broker before sending
Sending Messages
Byte Messages
Send raw byte data:
Returns: Unique message ID for tracking
Messages with Attributes
Add metadata to messages:
Use cases:
- Routing hints
- Message metadata
- Custom headers
- Tracing IDs
Partitioned Topics
Partitions enable horizontal scaling by distributing messages across multiple brokers.
Creating Partitioned Producers
use danube_client::DanubeClient;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = DanubeClient::builder()
.service_url("http://127.0.0.1:6650")
.build()
.await?;
let mut producer = client
.new_producer()
.with_topic("/default/high-throughput")
.with_name("partitioned-producer")
.with_partitions(3) // Create 3 partitions
.build()?;
producer.create().await?;
Ok(())
}
import (
"context"
"log"
"github.com/danube-messaging/danube-go"
)
func main() {
client, err := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
ctx := context.Background()
producer, err := client.NewProducer().
WithName("partitioned-producer").
WithTopic("/default/high-throughput").
WithPartitions(3). // Create 3 partitions
Build()
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
if err := producer.Create(ctx); err != nil {
log.Fatalf("Failed to initialize producer: %v", err)
}
}
What happens:
- Topic splits into N partitions (e.g.,
-part-0,-part-1,-part-2). - Producer routes messages round-robin across partitions.
Reliable Dispatch
Reliable dispatch guarantees message delivery by persisting messages before acknowledging sends.
Enabling Reliable Dispatch
use danube_client::DanubeClient;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = DanubeClient::builder()
.service_url("http://127.0.0.1:6650")
.build()
.await?;
let mut producer = client
.new_producer()
.with_topic("/default/critical-events")
.with_name("reliable-producer")
.with_reliable_dispatch() // Enable persistence
.build()?;
producer.create().await?;
Ok(())
}
import (
"context"
"log"
"github.com/danube-messaging/danube-go"
)
func main() {
client, err := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
ctx := context.Background()
// Configure reliable dispatch strategy
reliableStrategy := danube.NewReliableDispatchStrategy()
producer, err := client.NewProducer().
WithName("reliable-producer").
WithTopic("/default/critical-events").
WithDispatchStrategy(reliableStrategy).
Build()
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
if err := producer.Create(ctx); err != nil {
log.Fatalf("Failed to initialize producer: %v", err)
}
}
Notes: Reliable dispatch persists messages before acking. Use it for critical events that must not be lost.
Schema Integration
Link producers to schemas for type safety (see Schema Registry for details).
Minimal Schema Usage
producer, err := client.NewProducer().
WithTopic("/default/events").
WithName("schema-producer").
WithSchemaSubject("event-schema").
Build()
if err != nil {
log.Fatalf("Failed to build producer: %v", err)
}
if err := producer.Create(ctx); err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
For schema registration and versioning, see Schema Registry.
Full Examples
For complete runnable producers, see the client repositories:
- Go: https://github.com/danube-messaging/danube-go/tree/main/examples
- Rust: https://github.com/danube-messaging/danube/tree/main/danube-client/examples
Topic Naming
Topic Format
Topics follow a namespace structure:
Examples:
/default/orders- Orders in default namespace/production/user-events- User events in production namespace/staging/logs- Logs in staging namespace